【llama.cpp】qwen2_vl_surgery.py详解
目录
■官方代码
■详解
qwen2_vl_surgery.py
■官方代码
# 克隆llama.cppgit clone https://github.com/HimariO/llama.cpp.qwen2.5vl.gitcd llama.cpp.qwen2.5vlgit checkout qwen25-vl-20250404
■详解
qwen2_vl_surgery.py
这段代码用于将 Qwen2-VL 模型的视觉部分转换为 GGUF 格式,支持 fp32/fp16 精度。主要功能包括:
- 加载模型和配置;
- 提取视觉模块权重并重命名;
- 将权重写入 GGUF 文件,供推理使用。
import argparse
from typing import Dictimport torch
import numpy as np
from gguf import *
from transformers import (AutoProcessor,Qwen2VLConfig,Qwen2VLProcessor,Qwen2VLForConditionalGeneration,Qwen2_5_VLConfig, # type: ignore[reportAttributeAccessIssue]Qwen2_5_VLForConditionalGeneration, # type: ignore[reportAttributeAccessIssue]
)VISION = "clip.vision"def k(raw_key: str, arch: str) -> str:"""格式化键名字符串,将架构信息插入到键名模板中参数:raw_key (str): 包含架构占位符的键名模板字符串,使用 {arch} 作为占位符arch (str): 架构名称,用于替换模板中的占位符返回:str: 格式化后的键名字符串,其中 {arch} 占位符被实际的架构名称替换"""return raw_key.format(arch=arch)class VL2:"""VL2 类用于处理视觉-语言模型中的张量名称转换和提取视觉模块的权重。"""@staticmethoddef to_gguf_name(name: str) -> str:"""将原始模型中的参数名称转换为 GGUF 格式兼容的名称。参数:name (str): 原始模型中的参数名称。返回:str: 转换后的 GGUF 兼容名称。"""og = namename = name.replace("text_model", "t").replace("vision_model", "v")name = name.replace("blocks", "blk").replace("embeddings.", "")name = name.replace("attn.", "attn_")name = name.replace("mlp.fc1", "ffn_down").replace("mlp.fc2", "ffn_up").replace("proj.", "out.")# name = name.replace("layrnorm", "ln").replace("layer_norm", "ln").replace("layernorm", "ln")name = name.replace("norm1", "ln1").replace("norm2", "ln2")name = name.replace("merger.mlp", 'mm')print(f"[to_gguf_name] {og} --> {name}")return name@classmethoddef find_vision_tensors(cls, qwen2vl, dtype) -> Dict[str, np.ndarray]:"""提取视觉模型中的所有张量,并根据需要进行重命名和拆分。参数:qwen2vl: 包含视觉模型的完整模型对象。dtype: 目标数据类型,用于非归一化层权重的转换。返回:Dict[str, np.ndarray]: 一个字典,键是转换后的张量名称,值是对应的 NumPy 数组。"""vision_model = qwen2vl.visualtensor_map = {}# 遍历视觉模型的所有状态字典项for name, ten in vision_model.state_dict().items():ten = ten.numpy()# 处理 QKV 合并的线性层(注意力机制中常见的合并查询、键、值)if 'qkv' in name:if ten.ndim == 2: # 权重矩阵c3, _ = ten.shapeelse: # 偏置向量c3 = ten.shape[0]assert c3 % 3 == 0c = c3 // 3wq = ten[:c]wk = ten[c: c * 2]wv = ten[c * 2:]base_name = f"vision_model.{name}"# 分别保存 Q、K、V 的权重tensor_map[cls.to_gguf_name(base_name).replace("qkv", "q")] = wqtensor_map[cls.to_gguf_name(base_name).replace("qkv", "k")] = wktensor_map[cls.to_gguf_name(base_name).replace("qkv", "v")] = wv# 处理 merger 模块中的 MLP 和 LayerNorm 层elif 'merger' in name:if name.endswith("ln_q.weight"):tensor_map['v.post_ln.weight'] = tenelif name.endswith("ln_q.bias"):tensor_map['v.post_ln.bias'] = tenelse:# "merger.mlp.%d.weight/bias" --> "mm.%d.weight/bias"tensor_map[cls.to_gguf_name(name)] = ten# 特殊处理 patch embedding 中的 3D 卷积核,将其拆分为两个 2D 卷积核elif 'patch_embed.proj.weight' in name:# NOTE: split Conv3D into Conv2Ds# 从输入张量中提取时空patch嵌入权重# 该函数假设输入张量包含两个时间步的patch嵌入信息# # 参数:# ten: 输入张量,形状为(c1, c2, kt, kh, kw)# c1, c2: 通道维度# kt: 时间维度,当前实现要求必须为2# kh, kw: 空间维度(高度和宽度)# tensor_map: 字典,用于存储提取的权重张量## 重要假设:# - 时间patch大小必须为2,这是当前实现的限制# - 输入张量的前两个维度表示通道信息# - 第三个维度表示时间步,只处理两个时间步的情况## 处理逻辑:# - 将输入张量在时间维度上分离# - 第0个时间步的权重存储为"v.patch_embd.weight"# - 第1个时间步的权重存储为"v.patch_embd.weight.1"c1, c2, kt, kh, kw = ten.shapeassert kt == 2, "Current implmentation only support temporal_patch_size of 2"tensor_map["v.patch_embd.weight"] = ten[:, :, 0, ...]tensor_map["v.patch_embd.weight.1"] = ten[:, :, 1, ...]# 其他常规张量直接映射并重命名else:tensor_map[cls.to_gguf_name(f"vision_model.{name}")] = ten# 根据张量维度决定是否转换为指定的数据类型for new_name, ten in tensor_map.items():if ten.ndim <= 1 or new_name.endswith("_norm.weight"):tensor_map[new_name] = ten.astype(np.float32)else:tensor_map[new_name] = ten.astype(dtype)# 添加一个占位的位置编码张量(dummy tensor)# 该代码块用于在tensor_map字典中添加一个名为"v.position_embd.weight"的位置编码权重张量,# 该张量初始化为10x10的零矩阵,数据类型为float32,作为占位符使用tensor_map["v.position_embd.weight"] = np.zeros([10, 10], dtype=np.float32)return tensor_mapclass VL25(VL2):"""VL25类继承自VL2类,用于处理模型名称到GGUF格式名称的转换。"""@staticmethoddef to_gguf_name(name: str) -> str:"""将模型层名称转换为GGUF格式的名称。该函数通过一系列字符串替换操作,将原始模型名称中的特定关键词替换为GGUF格式约定的缩写形式。参数:name (str): 原始模型层名称返回:str: 转换后的GGUF格式名称"""og = name# 替换模型类型相关关键词name = name.replace("text_model", "t").replace("vision_model", "v")# 替换结构相关关键词name = name.replace("blocks", "blk").replace("embeddings.", "")# 替换注意力机制相关关键词name = name.replace("attn.", "attn_")# 替换MLP层相关关键词name = name.replace("mlp.down_proj", "ffn_down").replace("mlp.up_proj", "ffn_up")name = name.replace("mlp.gate_proj", "ffn_gate").replace("proj.", "out.")# 替换归一化层相关关键词name = name.replace("norm1", "ln1").replace("norm2", "ln2")# 替换融合模块相关关键词name = name.replace("merger.mlp", 'mm')print(f"[vl25][to_gguf_name] {og} --> {name}")return namedef main(args):"""主函数,用于将 Qwen2VL 或 Qwen2.5VL 模型的视觉编码器部分导出为 GGUF 格式。参数:args: 命令行参数对象,包含以下属性:- data_type (str): 数据类型,支持 'fp32' 或 'fp16'。- model_name (str): 模型名称或本地路径。- model_type (str): 模型类型,支持 "qwen2vl" 或 "qwen2.5vl"。返回值:无返回值。输出为一个以 `-vision.gguf` 结尾的 GGUF 文件。"""# 根据指定的数据类型设置 PyTorch 和 NumPy 的数据类型以及 GGUF 文件类型标识if args.data_type == 'fp32':dtype = torch.float32np_dtype = np.float32ftype = 0elif args.data_type == 'fp16':dtype = torch.float16np_dtype = np.float16ftype = 1else:raise ValueError()local_model = Falsemodel_path = ""model_name = args.model_nameprint("model_name: ", model_name)# 加载对应类型的模型并获取其配置信息和视觉配置if args.model_type == "qwen2vl":qwen2vl = Qwen2VLForConditionalGeneration.from_pretrained(model_name, torch_dtype=dtype, device_map="cpu")cfg: Qwen2VLConfig = qwen2vl.config # type: ignore[reportAssignmentType]vcfg = cfg.vision_configelse:qwen2vl = Qwen2_5_VLForConditionalGeneration.from_pretrained(model_name, torch_dtype=dtype, device_map="cpu")cfg: Qwen2_5_VLConfig = qwen2vl.config # type: ignore[reportAssignmentType]vcfg = cfg.vision_config# 判断模型是否来自本地路径,并处理路径和模型名if os.path.isdir(model_name):local_model = Trueif model_name.endswith(os.sep):model_name = model_name[:-1]model_path = model_namemodel_name = os.path.basename(model_name)# 设置输出文件名fname_out = f"{model_name.replace('/', '-').lower()}-vision.gguf"# 初始化 GGUF 写入器并添加基本元数据fout = GGUFWriter(path=fname_out, arch="clip")fout.add_description("image encoder for Qwen2VL")fout.add_file_type(ftype)fout.add_bool("clip.has_text_encoder", False)fout.add_bool("clip.has_vision_encoder", True)fout.add_bool("clip.has_qwen2vl_merger", True)fout.add_string("clip.projector_type", "qwen2vl_merger")# 根据激活函数类型设置相应的布尔标志print(cfg.vision_config)if 'silu' in cfg.vision_config.hidden_act.lower():fout.add_bool("clip.use_silu", True)fout.add_bool("clip.use_gelu", False)elif 'gelu' in cfg.vision_config.hidden_act.lower():fout.add_bool("clip.use_silu", False)fout.add_bool("clip.use_gelu", 'quick' not in cfg.vision_config.hidden_act.lower())else:raise ValueError()# 根据模型类型添加特定的视觉模型参数if args.model_type == "qwen2.5vl":fout.add_bool("clip.use_glu_mlp", True) # gate linear unit MLP layer in vision modelfout.add_bool("clip.use_rms_norm", True)fout.add_array("clip.vision.fullatt_block_indexes", vcfg.fullatt_block_indexes)fout.add_uint32("clip.vision.window_size", vcfg.window_size)fout.add_uint32(k(KEY_EMBEDDING_LENGTH, VISION), vcfg.hidden_size)fout.add_uint32("clip.vision.projection_dim", vcfg.out_hidden_size)else:fout.add_uint32(k(KEY_EMBEDDING_LENGTH, VISION), vcfg.embed_dim)fout.add_uint32("clip.vision.projection_dim", vcfg.hidden_size)# 获取模型中的视觉相关张量并写入 GGUF 文件if args.model_type == "qwen2.5vl":tensor_map = VL25.find_vision_tensors(qwen2vl, np_dtype)else:tensor_map = VL2.find_vision_tensors(qwen2vl, np_dtype)for name, data in tensor_map.items():fout.add_tensor(name, data)# 添加视觉模型的基本结构参数fout.add_uint32("clip.vision.patch_size", vcfg.patch_size)fout.add_uint32("clip.vision.image_size", 14 * 40) # some reasonable size that is divable by (14*2)fout.add_uint32(k(KEY_ATTENTION_HEAD_COUNT, VISION), vcfg.num_heads)fout.add_float32(k(KEY_ATTENTION_LAYERNORM_EPS, VISION), 1e-6)fout.add_uint32(k(KEY_BLOCK_COUNT, VISION), vcfg.depth)fout.add_uint32(k(KEY_FEED_FORWARD_LENGTH, VISION), 0) # not sure what this does, put 0 here as a placeholderfout.add_name(model_name)"""HACK: Since vision rope related parameter aren't stored in the `Qwen2VLConfig,it will be hardcoded in the `clip_image_build_graph` from `clip.cpp`."""# 加载处理器以获取图像预处理参数(均值和标准差)if local_model:processor: Qwen2VLProcessor = AutoProcessor.from_pretrained(model_path)else:processor: Qwen2VLProcessor = AutoProcessor.from_pretrained(model_name)fout.add_array("clip.vision.image_mean", processor.image_processor.image_mean) # type: ignore[reportAttributeAccessIssue]fout.add_array("clip.vision.image_std", processor.image_processor.image_std) # type: ignore[reportAttributeAccessIssue]# 将所有数据写入文件并关闭写入器fout.write_header_to_file()fout.write_kv_data_to_file()fout.write_tensors_to_file()fout.close()print("save model as: ", fname_out)if __name__ == "__main__":parser = argparse.ArgumentParser()parser.add_argument("model_name", nargs='?', default="Qwen/Qwen2-VL-2B-Instruct")parser.add_argument("--model_type", nargs='?', choices=['qwen2vl', 'qwen2.5vl'], default="qwen2vl")parser.add_argument("--data_type", nargs='?', choices=['fp32', 'fp16'], default="fp32")args = parser.parse_args()main(args)
至此,本文分享的内容就结束了。