当前位置: 首页 > ai >正文

open webui源码分析7—过滤器

        很多Web开发的小伙伴肯定非常熟悉过滤器的使用,正如SpringCloudGateway中的过滤器可以读写Request和Response一样,在open webui中过滤器可以对往返大模型的数据进行处理,从而可以在不中断对话的前提下,拦截对话内容并进行修改或其他处理。具体来说包括:

       1) 修改用户输入:在输入数据到达大模型之前调整输入数据。可以提高清晰度、添加上下文、净化文本或重新格式化消息以满足特定要求的地方。

        2)拦截模型输出:针对流式情况。捕获并调整大模型生成的响应。这对于实时修改很有用,比如过滤掉敏感信息或格式化输出以提高可读性。

        3)修改模型输出:针对非流式情况。接受大模型的完整响应,经过处理后再将其显示给用户。这可以帮助优化、记录或调整数据,以获得更清晰的用户体验。

        过滤器一般用于轻量级处理,包括:发送数据到监控平台、记录日志、修改用户输入、阻断有害消息、翻译和限流等。

        根据系列6中分析,过滤器也是Function的一种。具体的导入也跟Function一样。本文以一个简单的过滤器为例,该过滤器获取当前的日期和时间作为上下文,增加到消息中发送给大模型。过滤器代码如下,不做分析,供参考:

"""
title: Message Date And Time
author: benasraudys
author_url: https://github.com/benasraudys
funding_url: https://github.com/benasraudys
description: Gives model current date and time context for each message. Don't forget to adjust the timezone in the settings.
version: 0.1.1
required_open_webui_version: 0.6.4
"""

import datetime
import os
from pydantic import BaseModel, Field
from typing import Callable, Awaitable, Any, Optional


class Filter:
class Valves(BaseModel):
timezone_hours: int = Field(
default=0,
description="Timezone offset hours (e.g., 5 for UTC+5:30, -4 for UTC-4:00)",
)
timezone_minutes: int = Field(
default=0,
description="Timezone offset minutes (e.g., 30 for UTC+5:30, 45 for UTC-4:45)",
)
southern_hemisphere: bool = Field(
default=False,
description="Enable if you're in the Southern Hemisphere (Australia, South America, etc.)",
)

    def __init__(self):
self.valves = self.Valves(
**{
"timezone_hours": int(os.getenv("DATETIME_TIMEZONE_HOURS", "0")),
"timezone_minutes": int(os.getenv("DATETIME_TIMEZONE_MINUTES", "0")),
"southern_hemisphere": os.getenv(
"DATETIME_SOUTHERN_HEMISPHERE", "false"
).lower()
== "true",
}
)

    def get_season(self, month, southern_hemisphere=False):
if not southern_hemisphere:
if 3 <= month <= 5:
return "Spring"
elif 6 <= month <= 8:
return "Summer"
elif 9 <= month <= 11:
return "Autumn"
else:
return "Winter"
else:
if 3 <= month <= 5:
return "Autumn"
elif 6 <= month <= 8:
return "Winter"
elif 9 <= month <= 11:
return "Spring"
else:
return "Summer"

    def get_time_of_day(self, hour):
if 5 <= hour < 12:
return "Morning"
elif 12 <= hour < 17:
return "Afternoon"
elif 17 <= hour < 21:
return "Evening"
else:
return "Night"

    async def inlet(
self,
body: dict,
__event_emitter__: Callable[[Any], Awaitable[None]],
__request__: Any,
__user__: Optional[dict] = None,
__model__: Optional[dict] = None,
) -> dict:
now_utc = datetime.datetime.utcnow()

        timezone_hours = self.valves.timezone_hours
timezone_minutes = self.valves.timezone_minutes
total_offset_minutes = (timezone_hours * 60) + timezone_minutes

        now = now_utc + datetime.timedelta(minutes=total_offset_minutes)

        month = now.month
hour = now.hour

        formatted_date = now.strftime("%B %d, %Y")
formatted_time = now.strftime("%H:%M:%S")
day_of_week = now.strftime("%A")

        hours_offset = abs(total_offset_minutes) // 60
minutes_offset = abs(total_offset_minutes) % 60

        if minutes_offset == 0:
if total_offset_minutes >= 0:
timezone_str = f"UTC+{hours_offset}"
else:
timezone_str = f"UTC-{hours_offset}"
else:
if total_offset_minutes >= 0:
timezone_str = f"UTC+{hours_offset}:{minutes_offset:02d}"
else:
timezone_str = f"UTC-{hours_offset}:{minutes_offset:02d}"

        season = self.get_season(month, self.valves.southern_hemisphere)
time_of_day = self.get_time_of_day(hour)

        context = f"Current date is {day_of_week}, {formatted_date}, {season}, {time_of_day}, the user time is {formatted_time} {timezone_str}"

        datetime_message = {
"role": "system",
"content": f"Time context: {context}. ",
}

        if "messages" in body and isinstance(body["messages"], list):
body["messages"].insert(0, datetime_message)
else:
body["messages"] = [datetime_message]

        return body

        加载过滤器后,请求数据如下,其中filter为空,实际会使用数据库中的过滤器。

{
"stream": true,
"model": "qwen:0.5b",
"messages": [
{
"role": "user",
"content": "请用100个字以内客观评价一下李二曲"
}
],
"params": {},
"tool_servers": [],
"features": {
"image_generation": false,
"code_interpreter": false,
"web_search": false,
"memory": false
},
"variables": {
"{{USER_NAME}}": "acaluis",
"{{USER_LOCATION}}": "Unknown",
"{{CURRENT_DATETIME}}": "2025-08-22 13:03:09",
"{{CURRENT_DATE}}": "2025-08-22",
"{{CURRENT_TIME}}": "13:03:09",
"{{CURRENT_WEEKDAY}}": "Friday",
"{{CURRENT_TIMEZONE}}": "Etc/GMT-8",
"{{USER_LANGUAGE}}": "zh-CN"
},
"model_item": {
"id": "qwen:0.5b",
"name": "qwen:0.5b",
"object": "model",
"created": 1755838958,
"owned_by": "ollama",
"ollama": {
"name": "qwen:0.5b",
"model": "qwen:0.5b",
"modified_at": "2025-08-17T05:40:18.859598053Z",
"size": 394998579,
"digest": "b5dc5e784f2a3ee1582373093acf69a2f4e2ac1710b253a001712b86a61f88bb",
"details": {
"parent_model": "",
"format": "gguf",
"family": "qwen2",
"families": [
"qwen2"
],
"parameter_size": "620M",
"quantization_level": "Q4_0"
},
"connection_type": "local",
"urls": [
0
]
},
"connection_type": "local",
"tags": [],
"actions": [],
"filters": []
},
"session_id": "5Heu1KioqBpmZzfCAAAN",
"chat_id": "e0a4ef34-7a60-436a-9596-61d453e71b1b",
"id": "4e357f97-981f-43af-85d8-261ee8845035",
"background_tasks": {
"title_generation": true,
"tags_generation": true,
"follow_up_generation": true
}
}

        过滤器相关代码起点在process_chat_payload方法中,具体分析如下;

'''

    本方法比较简洁,首先获取可用过滤器列表,然后调用process_filter_functions执行过滤器的inlet方法

'''

async def process_chat_payload(request, form_data, user, metadata, model):

    ……#与过滤器无关代码

    try:

        #调用get_sorted_filter_ids获取可用的过滤器id列表,然后从数据库获取Function
filter_functions = [
Functions.get_function_by_id(filter_id)
for filter_id in get_sorted_filter_ids(
request, model, metadata.get("filter_ids", [])
)

        form_data, flags = await process_filter_functions(#调用过滤器inle方法
request=request,
filter_functions=filter_functions,
filter_type="inlet",
form_data=form_data,
extra_params=extra_params,
)
except Exception as e:
raise Exception(f"Error: {e}")

    #与过滤器无关代码

        get_sorted_filter_ids代码如下:

'''

    本方法逻辑如下:

    1)汇总全局过滤器和模型自带过滤器到filter_ids

    2)从数据库获取所有处于激活状态的过滤器

    3)从激活状态过滤器中剔除被禁用的过滤器

    4)对过滤器根据优先级进行排序

def get_sorted_filter_ids(request, model: dict, enabled_filter_ids: list = None):
def get_priority(function_id):
function = Functions.get_function_by_id(function_id)
if function is not None:
valves = Functions.get_function_valves_by_id(function_id)
return valves.get("priority", 0) if valves else 0
return 0

    #获取全局过滤器列表filter_ids

    filter_ids = [function.id for function in Functions.get_global_filter_functions()]
if "info" in model and "meta" in model["info"]: #模型自带过滤器追加到filter_ids中
filter_ids.extend(model["info"]["meta"].get("filterIds", []))
filter_ids = list(set(filter_ids))
active_filter_ids = [ #从数据库function表获取处于激活状态的过滤器(is_active=True)
function.id
for function in Functions.get_functions_by_type("filter", active_only=True)
]

    def get_active_status(filter_id):
function_module = get_function_module(request, filter_id)

        if getattr(function_module, "toggle", None):
return filter_id in (enabled_filter_ids or [])

        return True

    active_filter_ids = [#从active_filter_ids中剔除未启用的过滤器
filter_id for filter_id in active_filter_ids if get_active_status(filter_id)
]

    filter_ids = [fid for fid in filter_ids if fid in active_filter_ids]#可用过滤器列表
filter_ids.sort(key=get_priority)#对过滤器排序

    return filter_ids

        process_filter_functions方法代码如下:

async def process_filter_functions(#filter='inlet'
request, filter_functions, filter_type, form_data, extra_params
):
skip_files = None

    for function in filter_functions:
filter = function
filter_id = function.id #显然该行代码应该放到if not filter之后'&'
if not filter:  
continue

        function_module = get_function_module( #加载过滤器模块
request, filter_id, load_from_db=(filter_type != "stream")
)
# 从过滤器模块中获取inlet方法
handler = getattr(function_module, filter_type, None)
if not handler: #防错
continue

        # 如果模块有file_hander,则赋给skip_files
if filter_type == "inlet" and hasattr(function_module, "file_handler"):
skip_files = function_module.file_handler

        # 设置阀门值
if hasattr(function_module, "valves") and hasattr(function_module, "Valves"):
valves = Functions.get_function_valves_by_id(filter_id)
function_module.valves = function_module.Valves(
**(valves if valves else {})
)

        try:
# 获取过滤器方法 inlet签名
sig = inspect.signature(handler)

            params = {"body": form_data}
if filter_type == "stream":
params = {"event": form_data}

            #把params和extra_params和__id__中与inlet参数对应的kv拼接起来 

            params = params | {
k: v
for k, v in {
**extra_params,
"__id__": filter_id,
}.items()
if k in sig.parameters
}

            # 
if "__user__" in sig.parameters:#如果inlet方法中有__use__参数,则设置用户阀门
if hasattr(function_module, "UserValves"):#过滤器中有UserValves类
try:
params["__user__"]["valves"] = function_module.UserValves(
**Functions.get_user_valves_by_id_and_user_id(
filter_id, params["__user__"]["id"]
)
)
except Exception as e:
log.exception(f"Failed to get user values: {e}")

            #执行过滤器的 inlet方法
if inspect.iscoroutinefunction(handler):
form_data = await handler(**params)
else:
form_data = handler(**params)

        except Exception as e:
log.debug(f"Error in {filter_type} handler {filter_id}: {e}")
raise e

    # Handle file cleanup for inlet
if skip_files and "files" in form_data.get("metadata", {}):
del form_data["files"]
del form_data["metadata"]["files"]

    return form_data, {}

        验证一下,抓包open webui请求大模型,数据如下。

{
"model": "qwen:0.5b",
"messages": [
{
"role": "system",
"content": "Time context: Current date is Friday, August 22, 2025, Summer, Morning, the user time is 06:51:04 UTC+0."
},
{
"role": "user",
"content": "请用100个字以内客观评价一下李二曲"
}
],
"stream": true
}

        很显然,在messages中增加了一条系统消息,其中增加了当前日期和时间作为上下文。

http://www.xdnf.cn/news/18418.html

相关文章:

  • 获取后台返回的错误码
  • Linux822 shell:expect 批量
  • 车辆方向数据集 - 物体检测
  • 作品集PDF又大又卡?我用InDesign+Acrobat AI构建轻量化交互式文档工作流
  • 【LeetCode每日一题】238. 除自身以外数组的乘积
  • 【链表 - LeetCode】2. 两数相加
  • 服务器与客户端
  • 零基础从头教学Linux(Day 18)
  • 北斗导航 | 基于MCMC粒子滤波的接收机自主完好性监测(RAIM)算法(附matlab代码)
  • 【Linux我做主】细说进程地址空间
  • Spring Boot全局异常捕获指南
  • Jenkins自动化部署服务到Kubernetes环境
  • Java 面试题训练助手 Web 版本
  • JavaScript 操作 DOM
  • php apache无法接收到Authorization header
  • express+mongoose的node部署
  • 优考试局域网系统V6.0.0版
  • AI 论文周报丨多模态记忆智能体/视觉基础模型/推理模型等多领域成果一键速览
  • AI服务器介绍
  • 《Linux 网络编程一:网络编程导论及UDP 服务器的创建与数据接收》
  • 《基于大数据的农产品交易数据分析与可视化系统》选题不当,毕业答辩可能直接挂科
  • Linux系统 --- 指令
  • tauri配置允许执行eval脚本,在打包cocos游戏web/phone移动端的时候一定要配置
  • yolo训练实例(一)
  • AAA 服务器与 RADIUS 协议笔记
  • C++函数重载与引用详解
  • Django中间件自定义开发指南:从原理到实战的深度解析
  • 【机器学习深度学习】vLLM的核心优化技术详解
  • 大型语言模型中奖励模型的原理:训练、打分与更新
  • Java面试-自动装箱与拆箱机制解析