当前位置: 首页 > news >正文

open webui源码分析6-Function

        一、Functions简介

        可以把Tools作为依赖于外部服务的插件,Functions就是内部插件,二者都是用来增强open webui的能力的。Functions是轻量的,高度可定制的,并且是用纯Python编写的,所以你可以自由地创建任何东西——从新的人工智能工作流到与你使用的任何东西的集成,比如谷歌搜索或家庭助理。

       在open webui中的Function包括三种类型:Pipe Function、Filter Function和Action Function。

        Pipe类型的Function用于自定义Agent或模型,用户在对话中可以像普通的模型那样选择使用。

        Filter类型的Function用于对往返大模型的数据进行处理,从而可以在不中断对话的前提下,拦截对话内容并进行修改或其他处理,比如日志。过滤器一般用于轻量级处理,包括:发送数据到监控平台、记录日志、修改用户输入、阻断有害消息、翻译和限流等。

        Action类型的Function用来对聊天界面的按钮进行定制。这些按钮出现在单个聊天消息下方,让您可以方便地一键访问您定义的操作。

        本文仅对Action类型的Function进行解析。        

        二、导入一个Function

        1)进入open webui社区的Functions页面,选择一个Function,这里以Save Outputs为例

        2)点击Save Outputs,进入如下页面

        3)点击Get,在对话框填写你的open webui的地址

        4)点击Import to WebUI

        进入open webui页面,显示函数代码,核心代码为把大模型的输出写入本地文件中,完整源码如下:

"""
title: save_outputs
author: stefanpietrusky
author_url: https://downchurch.studio/
inspiration: add_to_memories_action_button @pad4651
instruction: you need to mount the container folder /app/data with a local folder when creating the container! „--mount type=bind,source="FOLDER PATH\docker_data",target=/app/data“
icon_url:  
version: 0.1
"""

import os
from pydantic import BaseModel, Field
from typing import Optional


class Action:
    class Valves(BaseModel):
        pass

    class UserValves(BaseModel):
        show_status: bool = Field(
            default=True, description="Show status of the action."
        )
        pass

    def __init__(self):
        self.valves = self.Valves()
        pass

    async def action(
        self,
        body: dict,
        __user__=None,
        __event_emitter__=None,
        __event_call__=None,
    ) -> Optional[dict]:
        print(f"action:{__name__}")

        user_valves = __user__.get("valves")
        if not user_valves:
            user_valves = self.UserValves()

        if __event_emitter__:
            last_assistant_message = body["messages"][-1]

            if user_valves.show_status:
                await __event_emitter__(
                    {
                        "type": "status",
                        "data": {"description": "Saving to file", "done": False},
                    }
                )

            try:
                directory = "/app/data"
                if not os.path.exists(directory):
                    os.makedirs(directory)

                file_path = os.path.join(directory, "saved_outputs.txt")
                with open(file_path, "a") as file:
                    file.write(f"{last_assistant_message['content']}\n\n")
                print("Output saved to file in the container, accessible on the host.")

            except Exception as e:
                print(f"Error saving output to file: {str(e)}")
                if user_valves.show_status:
                    await __event_emitter__(
                        {
                            "type": "status",
                            "data": {
                                "description": "Error Saving to File",
                                "done": True,
                            },
                        }
                    )

            if user_valves.show_status:
                await __event_emitter__(
                    {
                        "type": "status",
                        "data": {"description": "Output Saved", "done": True},
                    }
                )
 

        因为可能是恶意代码,所以需要阅读检查代码。检查无误后,可以保存,该函数便作为插件进入open webui体系中。

          三、具体使用

        函数生效后,在大模型返回对一个问题的应答后,在工具栏显示该函数图标。

        用户点击该链接,则保存当前大模型输出写入到文件中。

        三、源码分析

        1)数据模型

       Function数据保存在Function表中,表定义如下:

        其中:

                id:函数唯一标识

                userid:用户唯一标识

                name:函数名

                type:函数类型 filter|pipe|action

                content:方法源代码

               meta:元数据

                valves:阈值

                is_active:是否被激活(激活后才可见)

                is_global:全局还是局部(仅某个用户使用)

        2)导入函数

        从open webui社区页面点击 Import to WebUI时,浏览器启动一个新页面,并提交代码格式化请求到/app/v1/utils/code/format,后端调用black模块进行严格格式化处理,并把格式化后的代码返回前端。

@router.post("/code/format")
async def format_code(form_data: CodeForm, user=Depends(get_admin_user)):
    try:
        formatted_code = black.format_str(form_data.code, mode=black.Mode())
        return {"code": formatted_code}
    except black.NothingChanged:
        return {"code": form_data.code}
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

        完成格式化处理后,后端再提交创建Function请求到 /api/v1/functions/create。请求数据为:

{
    "id": "save_outputs",
    "name": "Save Outputs",
    "meta": {
        "description": "Save outputs locally on your computer.",
        "manifest": {
            "title": "save_outputs",
            "author": "stefanpietrusky",
            "author_url": "https://downchurch.studio/",
            "inspiration": "add_to_memories_action_button @pad4651",
            "instruction": "you need to mount the container folder /app/data with a local folder when creating the container! „--mount type=bind,source=\"FOLDER PATH\\docker_data\",target=/app/data\"",
            "icon_url": "",
            "version": "0.1"
        },
        "type": "action",
        "user": {
            "id": "9e4f4854-71d9-429a-99b9-9338a393de9e",
            "username": "pietrusky",
            "name": "",
            "createdAt": 1724186428,
            "role": null,
            "verified": false
        },
        "id": "542145b0-59a0-44f2-86f1-dd2f1e64d705"
    },

    #content由注释和源代码组成
    "content":  "\"\"\"\ntitle: save_outputs\nauthor: stefanpietrusky\nauthor_url: https://downchurch.studio/\ninspiration: add_to_memories_action_button @pad4651\ninstruction: you need to mount the container folder /app/data with a local folder when creating the container! „--mount type=bind,source=\"FOLDER PATH\\docker_data\",target=/app/data\"\nicon_url: \nversion: 0.1\n\"\"\"\n\nimport os\nfrom pydantic import BaseModel, Field\nfrom typing import Optional\n\n\nclass Action:\n    class Valves(BaseModel):\n        pass\n\n    class UserValves(BaseModel):\n        show_status: bool = Field(\n            default=True, description=\"Show status of the action.\"\n        )\n        pass\n\n    def __init__(self):\n        self.valves = self.Valves()\n        pass\n\n    async def action(\n        self,\n        body: dict,\n        __user__=None,\n        __event_emitter__=None,\n        __event_call__=None,\n    ) -> Optional[dict]:\n        print(f\"action:{__name__}\")\n\n        user_valves = __user__.get(\"valves\")\n        if not user_valves:\n            user_valves = self.UserValves()\n\n        if __event_emitter__:\n            last_assistant_message = body[\"messages\"][-1]\n\n            if user_valves.show_status:\n                await __event_emitter__(\n                    {\n                        \"type\": \"status\",\n                        \"data\": {\"description\": \"Saving to file\", \"done\": False},\n                    }\n                )\n\n            try:\n                directory = \"/app/data\"\n                if not os.path.exists(directory):\n                    os.makedirs(directory)\n\n                file_path = os.path.join(directory, \"saved_outputs.txt\")\n                with open(file_path, \"a\") as file:\n                    file.write(f\"{last_assistant_message['content']}\\n\\n\")\n                print(\"Output saved to file in the container, accessible on the host.\")\n\n            except Exception as e:\n                print(f\"Error saving output to file: {str(e)}\")\n                if user_valves.show_status:\n                    await __event_emitter__(\n                        {\n                            \"type\": \"status\",\n                            \"data\": {\n                                \"description\": \"Error Saving to File\",\n                                \"done\": True,\n                            },\n                        }\n                    )\n\n            if user_valves.show_status:\n                await __event_emitter__(\n                    {\n                        \"type\": \"status\",\n                        \"data\": {\"description\": \"Output Saved\", \"done\": True},\n                    }\n                )"
}

       对应函数源码如下:

处理流程如下:

1)防错处理,判断函数名是否符合python标识符的命名规则,不符合则报错

2)对源中import的模块名进行替换

3)加载源码成为可使用的模块

4)把该函数加载到全局FUNCTIONS中,供后继使用

5)为函数创建缓存目录

@router.post("/create", response_model=Optional[FunctionResponse])
async def create_new_function(
    request: Request, form_data: FunctionForm, user=Depends(get_admin_user)
):
    if not form_data.id.isidentifier(): #对id进行校验
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Only alphanumeric characters and underscores are allowed in the id",
        )

    form_data.id = form_data.id.lower()

    #从Function表查询该函数是否已经入库

    function = Functions.get_function_by_id(form_data.id)
    if function is None: 
        try:

            #用本地模块名,替换源码中的模块名,比如用from open_webui.utils替换from utils 
            form_data.content = replace_imports(form_data.content)

            #把函数加载为模块
            function_module, function_type, frontmatter = load_function_module_by_id(
                form_data.id,
                content=form_data.content,
            )
            form_data.meta.manifest = frontmatter

            #把Function实例增加到全局FUNCTIONS中

            FUNCTIONS = request.app.state.FUNCTIONS
            FUNCTIONS[form_data.id] = function_module

            #把函数数据插入到FUNCTION表中

            function = Functions.insert_new_function(user.id, function_type, form_data)

            #为该方法创建目录/app/backend/data/cache/functions/{函数名}

            function_cache_dir = CACHE_DIR / "functions" / form_data.id
            function_cache_dir.mkdir(parents=True, exist_ok=True)

            if function:
                return function
            else:
                raise HTTPException(
                    status_code=status.HTTP_400_BAD_REQUEST,
                    detail=ERROR_MESSAGES.DEFAULT("Error creating function"),
                )
        except Exception as e:
            log.exception(f"Failed to create a new function: {e}")
            raise HTTPException(
                status_code=status.HTTP_400_BAD_REQUEST,
                detail=ERROR_MESSAGES.DEFAULT(e),
            )
    else: #如果已经入库,则报错
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=ERROR_MESSAGES.ID_TAKEN,
        )
 

        在该方法中的核心代码是load_function_module_by_id,load_function_module_by_id实现代码动态加载,重点分析一下。

def load_function_module_by_id(function_id: str, content: str | None = None):

    #如果参数content为None,则从数据库查询
    if content is None:
        function = Functions.get_function_by_id(function_id)
        if not function:
            raise Exception(f"Function not found: {function_id}")
        content = function.content

        content = replace_imports(content)#替换源码中的导入的模块名
        Functions.update_function_by_id(function_id, {"content": content})#更新数据库content
    else:#从content提取元数据
        frontmatter = extract_frontmatter(content)

        #安装依赖模块
        install_frontmatter_requirements(frontmatter.get("requirements", ""))

   

    module_name = f"function_{function_id}"

    #创建function_{function_id}模块,比如function_save_outputs
    module = types.ModuleType(module_name)

    #加载模块到sys_modules
    sys.modules[module_name] = module

    # 创建临时文件,用于存储函数的源代码
    temp_file = tempfile.NamedTemporaryFile(delete=False)
    temp_file.close()
    try:

        #把源代码写入临时文件
        with open(temp_file.name, "w", encoding="utf-8") as f:
            f.write(content)
        module.__dict__["__file__"] = temp_file.name #设置模块的__file__为临时文件名

        # 在本模块的命名空间运行源代码,完成模块源码的载入
        exec(content, module.__dict__)
        frontmatter = extract_frontmatter(content)
        log.info(f"Loaded module: {module.__name__}")

        # 根据Function类型,返回对应类的实例
        if hasattr(module, "Pipe"):#返回管道实例
            return module.Pipe(), "pipe", frontmatter
        elif hasattr(module, "Filter"): #返回过滤器实例
            return module.Filter(), "filter", frontmatter
        elif hasattr(module, "Action"):
            return module.Action(), "action", frontmatter  #返回Action实例
        else:
            raise Exception("No Function class found in the module")
    except Exception as e:
        log.error(f"Error loading module: {function_id}: {e}")
        # Cleanup by removing the module in case of error
        del sys.modules[module_name]

        Functions.update_function_by_id(function_id, {"is_active": False})
        raise e
    finally:
        os.unlink(temp_file.name)

        3)执行函数

        用户在对话界面点击按钮执行函数时,后端入口为http://{ip:port}/api/chat/actions/{函数名},后端调用该函数执行对应的操作。对应入口函数为chat_action。

该方法和简洁,主要是调用chat_action_handle。

@app.post("/api/chat/actions/{action_id}")
async def chat_action(
    request: Request, action_id: str, form_data: dict, user=Depends(get_verified_user)
):
    try:
        model_item = form_data.pop("model_item", {})

        if model_item.get("direct", False):
            request.state.direct = True
            request.state.model = model_item

        return await chat_action_handler(request, action_id, form_data, user)
    except Exception as e:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=str(e),
        )

        chat_action_handle实际对应 open_webui.utils.chat模块中的chat_action方法,具体源码如下:

async def chat_action(request: Request, action_id: str, form_data: dict, user: Any):
    if "." in action_id: #如果action_id是多层,则用'.'分割
        action_id, sub_action_id = action_id.split(".")
    else:
        sub_action_id = None

    action = Functions.get_function_by_id(action_id)#从数据库查找Function是否存在
    if not action:
        raise Exception(f"Action not found: {action_id}")

    #以下代码确定使用的模型

    if not request.app.state.MODELS: 
        await get_all_models(request, user=user)

    if getattr(request.state, "direct", False) and hasattr(request.state, "model"):
        models = {
            request.state.model["id"]: request.state.model,
        }
    else:
        models = request.app.state.MODELS

    data = form_data
    model_id = data["model"]

    if model_id not in models:
        raise Exception("Model not found")
    model = models[model_id]

    #通过websocket发送数据到前端

    __event_emitter__ = get_event_emitter(
        {
            "chat_id": data["chat_id"],
            "message_id": data["id"],
            "session_id": data["session_id"],
            "user_id": user.id,
        }
    )
    __event_call__ = get_event_call(
        {
            "chat_id": data["chat_id"],
            "message_id": data["id"],
            "session_id": data["session_id"],
            "user_id": user.id,
        }
    )

    #根据action_id获取模块

    function_module, _, _ = get_function_module_from_cache(request, action_id)

    #阀门处理

    if hasattr(function_module, "valves") and hasattr(function_module, "Valves"):
        valves = Functions.get_function_valves_by_id(action_id)
        function_module.valves = function_module.Valves(**(valves if valves else {}))

    if hasattr(function_module, "action"):
        try:
            action = function_module.action#从Action类中获取action方法

            # 得到函数签名
            sig = inspect.signature(action)
            params = {"body": data}

            # Extra parameters to be passed to the function
            extra_params = {
                "__model__": model,
                "__id__": sub_action_id if sub_action_id is not None else action_id,
                "__event_emitter__": __event_emitter__,
                "__event_call__": __event_call__,
                "__request__": request,
            }

            #把extra_params中的项中与函数签名中的参数匹配的项加入到params中
            for key, value in extra_params.items():
                if key in sig.parameters:
                    params[key] = value

            if "__user__" in sig.parameters:

                #如果函数签名中有__user__,则在调用参数中增加用户相关阀门设置
                __user__ = user.model_dump() if isinstance(user, UserModel) else {}

                try:
                    if hasattr(function_module, "UserValves"):
                        __user__["valves"] = function_module.UserValves(
                            **Functions.get_user_valves_by_id_and_user_id(
                                action_id, user.id
                            )
                        )
                except Exception as e:
                    log.exception(f"Failed to get user values: {e}")

                params = {**params, "__user__": __user__}

            if inspect.iscoroutinefunction(action): #如果action方法是协程,则await调用
                data = await action(**params)
            else: #非协程则直接调用
                data = action(**params)

        except Exception as e:
            return Exception(f"Error: {e}")

    return data
 

http://www.xdnf.cn/news/1344745.html

相关文章:

  • FPGA学习笔记——简单的IIC读写EEPROM
  • FPGA高端项目:图像采集+Aurora 8B10B+UDP图传架构,基于GTH高速收发器的光口转网口,提供工程源码和技术支持
  • IntelliJ IDEA 常用快捷键笔记(Windows)
  • SRE系列(二) | 从可用性到 SLI/SLO
  • 【数据结构】B 树——高度近似可”独木成林“的榕树——详细解说与其 C 代码实现
  • MySQL编程开发(了解)
  • 08高级语言逻辑结构到汇编语言之逻辑结构转换 continue break 完结汇编按逻辑结构
  • Redis---事务
  • 51单片机-驱动步进电机模块教程
  • C#_组合优于继承的实际应用
  • Kafka Broker 核心原理全解析:存储、高可用与数据同步
  • 如何从根源上理解并解决前端的CORS跨域问题
  • 【PSINS工具箱】MATLAB例程,二维平面上的组合导航,EKF融合速度、位置和IMU数据,4维观测量
  • Unreal Engine ClassName Rule
  • Python 中 SQLAlchemy 和 MySQLdb 的关系
  • IKE 与 ISAKMP 核心笔记
  • 微信扫码登陆 —— 接收消息
  • 复合设计模式
  • 加密货币与区块链:六大刑事重灾区
  • 深入理解 Spring Boot Starter:简化依赖管理与自动配置的利器
  • 110、【OS】【Nuttx】【周边】效果呈现方案解析:查找最新构建件
  • 深入理解 hash -r:解决 Linux 命令缓存难题的关键密钥
  • 自定义rabbitmq的ConnectionFactory配置
  • RabbitMQ深度剖析:从基础到高级进阶实战
  • 乐迪信息:AI摄像机+刮板机人员入侵检测:杜绝井下安全事故
  • 爬虫基础学习-配置代理、以及项目实践
  • 关于爬虫的基本步骤说明【爬虫七步骤】
  • jenkins实现分布式构建并自动发布到远程服务器上 jenkins实现自动打包编译发布远程服务器
  • Laravel分布式全链路追踪实战
  • 【机器学习深度学习】LMDeploy的分布式推理实现