当前位置: 首页 > backend >正文

Python实现智能文件搜索系统:从基础到高级应用

在当今数字时代,随着数据量的爆炸性增长,高效地搜索和定位文件变得越来越重要。无论是个人用户管理自己的文档,还是企业处理海量数据,一个智能的文件搜索系统都能显著提高工作效率。本文将深入探讨如何使用Python实现一个智能文件搜索系统,从基础的文件遍历到高级的内容索引、自然语言处理和机器学习技术,全面展示智能文件搜索的实现过程与优化策略。

目录

  1. 智能文件搜索概述
  2. 基础文件系统操作
  3. 文件元数据提取与索引
  4. 全文检索实现
  5. 自然语言处理增强搜索
  6. 机器学习优化搜索结果
  7. 分布式搜索系统
  8. 完整项目实现
  9. 性能评估与优化
  10. 总结与展望

智能文件搜索概述

智能文件搜索系统不同于传统的文件查找工具,它不仅能够基于文件名进行搜索,还能分析文件内容、元数据,甚至理解用户的搜索意图,提供更加精准和个性化的搜索结果。一个完善的智能文件搜索系统通常包括以下核心功能:

  1. 基础文件遍历:能够高效地遍历文件系统,识别各种类型的文件
  2. 元数据提取:从文件中提取创建时间、修改时间、大小、类型等元数据
  3. 内容索引:建立文件内容的索引,支持全文检索
  4. 语义理解:理解文件内容和用户查询的语义,支持模糊匹配和相关性搜索
  5. 个性化推荐:根据用户的搜索历史和行为模式,提供个性化的搜索结果
  6. 实时更新:能够检测文件系统的变化,实时更新索引

在本文中,我们将逐步实现这些功能,构建一个功能完善的智能文件搜索系统。

基础文件系统操作

首先,我们需要实现基础的文件系统遍历功能,这是智能文件搜索系统的基础。Python提供了强大的文件系统操作库,如ospathlibglob,可以帮助我们高效地遍历文件系统。

使用os.walk遍历文件系统

import os
import timedef traverse_directory(root_dir):"""遍历指定目录下的所有文件Args:root_dir: 要遍历的根目录Returns:文件路径列表"""file_list = []start_time = time.time()for root, dirs, files in os.walk(root_dir):for file in files:file_path = os.path.join(root, file)file_list.append(file_path)end_time = time.time()print(f"遍历完成,共找到 {len(file_list)} 个文件,耗时 {end_time - start_time:.2f} 秒")return file_list# 示例使用
if __name__ == "__main__":files = traverse_directory("/path/to/directory")for file in files[:10]:  # 打印前10个文件print(file)

使用pathlib实现更现代的文件遍历

from pathlib import Path
import timedef traverse_directory_pathlib(root_dir):"""使用pathlib遍历指定目录下的所有文件Args:root_dir: 要遍历的根目录Returns:文件路径列表"""root_path = Path(root_dir)start_time = time.time()# 递归遍历所有文件file_list = list(root_path.rglob('*'))# 只保留文件,排除目录file_list = [str(f) for f in file_list if f.is_file()]end_time = time.time()print(f"遍历完成,共找到 {len(file_list)} 个文件,耗时 {end_time - start_time:.2f} 秒")return file_list

使用glob进行模式匹配

import glob
import timedef find_files_by_pattern(root_dir, pattern="*"):"""使用glob根据模式查找文件Args:root_dir: 要搜索的根目录pattern: 文件模式,如 "*.txt", "*.py" 等Returns:匹配的文件列表"""start_time = time.time()# 构建搜索路径search_path = os.path.join(root_dir, "**", pattern)# 递归搜索file_list = glob.glob(search_path, recursive=True)end_time = time.time()print(f"搜索完成,共找到 {len(file_list)} 个匹配的文件,耗时 {end_time - start_time:.2f} 秒")return file_list

并行文件遍历

对于大型文件系统,串行遍历可能会非常耗时。我们可以使用Python的并行处理库,如concurrent.futuresmultiprocessing,来加速文件遍历过程。

import os
import time
import concurrent.futuresdef traverse_subdirectory(directory):"""遍历单个子目录"""file_list = []for root, _, files in os.walk(directory):for file in files:file_path = os.path.join(root, file)file_list.append(file_path)return file_listdef parallel_traverse_directory(root_dir, max_workers=None):"""并行遍历文件系统Args:root_dir: 要遍历的根目录max_workers: 最大工作线程数,默认为None(由系统决定)Returns:文件路径列表"""start_time = time.time()# 获取顶层目录top_level_dirs = [os.path.join(root_dir, d) for d in os.listdir(root_dir) if os.path.isdir(os.path.join(root_dir, d))]# 如果顶层目录太少,添加根目录本身if len(top_level_dirs) < 2:top_level_dirs.append(root_dir)# 并行遍历每个顶层目录all_files = []with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor:future_to_dir = {executor.submit(traverse_subdirectory, d): d for d in top_level_dirs}for future in concurrent.futures.as_completed(future_to_dir):dir_path = future_to_dir[future]try:files = future.result()all_files.extend(files)except Exception as e:print(f"遍历目录 {dir_path} 时出错: {e}")end_time = time.time()print(f"并行遍历完成,共找到 {len(all_files)} 个文件,耗时 {end_time - start_time:.2f} 秒")return all_files

文件元数据提取与索引

在实现了基础的文件遍历功能后,下一步是提取文件的元数据并建立索引。文件元数据包括文件名、路径、大小、创建时间、修改时间、文件类型等信息,这些信息对于文件搜索非常有用。

提取文件元数据

import os
import time
import datetime
import mimetypesdef extract_file_metadata(file_path):"""提取文件的元数据Args:file_path: 文件路径Returns:包含文件元数据的字典"""try:# 获取基本文件信息file_stat = os.stat(file_path)file_name = os.path.basename(file_path)file_ext = os.path.splitext(file_name)[1].lower()# 猜测MIME类型mime_type, _ = mimetypes.guess_type(file_path)# 构建元数据字典metadata = {'path': file_path,'name': file_name,'extension': file_ext,'size': file_stat.st_size,  # 文件大小(字节)'created_time': datetime.datetime.fromtimestamp(file_stat.st_ctime),'modified_time': datetime.datetime.fromtimestamp(file_stat.st_mtime),'accessed_time': datetime.datetime.fromtimestamp(file_stat.st_atime),'mime_type': mime_type or 'unknown'}return metadataexcept Exception as e:print(f"提取文件 {file_path} 的元数据时出错: {e}")return None

建立元数据索引

为了高效地搜索文件,我们需要建立元数据索引。这里我们使用SQLite数据库来存储和索引文件元数据,因为它轻量级且易于使用。

import sqlite3
import jsonclass FileIndexer:def __init__(self, db_path="file_index.db"):"""初始化文件索引器Args:db_path: SQLite数据库文件路径"""self.db_path = db_pathself.conn = Noneself.cursor = Noneself._init_database()def _init_database(self):"""初始化数据库连接和表结构"""self.conn = sqlite3.connect(self.db_path)self.cursor = self.conn.cursor()# 创建文件元数据表self.cursor.execute('''CREATE TABLE IF NOT EXISTS files (id INTEGER PRIMARY KEY AUTOINCREMENT,path TEXT UNIQUE,name TEXT,extension TEXT,size INTEGER,created_time TIMESTAMP,modified_time TIMESTAMP,accessed_time TIMESTAMP,mime_type TEXT,metadata_json TEXT,indexed_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''')# 创建索引以加速搜索self.cursor.execute('CREATE INDEX IF NOT EXISTS idx_path ON files (path)')self.cursor.execute('CREATE INDEX IF NOT EXISTS idx_name ON files (name)')self.cursor.execute('CREATE INDEX IF NOT EXISTS idx_extension ON files (extension)')self.cursor.execute('CREATE INDEX IF NOT EXISTS idx_modified ON files (modified_time)')self.conn.commit()def add_file(self, metadata):"""将文件元数据添加到索引Args:metadata: 文件元数据字典"""if not metadata:returntry:# 将额外的元数据转换为JSONmetadata_json = json.dumps(metadata)# 插入或更新文件记录self.cursor.execute('''INSERT OR REPLACE INTO files (path, name, extension, size, created_time, modified_time, accessed_time, mime_type, metadata_json)VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)''', (metadata['path'],metadata['name'],metadata['extension'],metadata['size'],metadata['created_time'].isoformat(),metadata['modified_time'].isoformat(),metadata['accessed_time'].isoformat(),metadata['mime_type'],metadata_json))self.conn.commit()except Exception as e:print(f"添加文件 {metadata['path']} 到索引时出错: {e}")def index_directory(self, directory):"""索引指定目录下的所有文件Args:directory: 要索引的目录路径"""start_time = time.time()# 遍历目录file_list = traverse_directory(directory)# 提取并索引每个文件的元数据for file_path in file_list:metadata = extract_file_metadata(file_path)if metadata:self.add_file(metadata)end_time = time.time()print(f"索引完成,共索引了 {len(file_list)} 个文件,耗时 {end_time - start_time:.2f} 秒")def search_by_name(self, name_pattern):"""按文件名搜索Args:name_pattern: 文件名模式,支持SQL LIKE语法Returns:匹配的文件列表"""self.cursor.execute("SELECT * FROM files WHERE name LIKE ?", (f"%{name_pattern}%",))return self.cursor.fetchall()def search_by_extension(self, extension):"""按文件扩展名搜索Args:extension: 文件扩展名,如 ".txt", ".py"Returns:匹配的文件列表"""self.cursor.execute("SELECT * FROM files WHERE extension = ?", (extension.lower(),))return self.cursor.fetchall()def search_by_date_range(self, start_date, end_date, date_type="modified_time"):"""按日期范围搜索Args:start_date: 开始日期(datetime对象)end_date: 结束日期(datetime对象)date_type: 日期类型,可以是 "created_time", "modified_time" 或 "accessed_time"Returns:匹配的文件列表"""self.cursor.execute(f"SELECT * FROM files WHERE {date_type} BETWEEN ? AND ?", (start_date.isoformat(), end_date.isoformat()))return self.cursor.fetchall()def search_by_size_range(self, min_size, max_size):"""按文件大小范围搜索Args:min_size: 最小文件大小(字节)max_size: 最大文件大小(字节)Returns:匹配的文件列表"""self.cursor.execute("SELECT * FROM files WHERE size BETWEEN ? AND ?", (min_size, max_size))return self.cursor.fetchall()def search_by_mime_type(self, mime_type):"""按MIME类型搜索Args:mime_type: MIME类型,如 "text/plain", "image/jpeg"Returns:匹配的文件列表"""self.cursor.execute("SELECT * FROM files WHERE mime_type LIKE ?", (f"%{mime_type}%",))return self.cursor.fetchall()def advanced_search(self, query_dict):"""高级搜索,支持多条件组合Args:query_dict: 查询条件字典,如 {"name": "report", "extension": ".pdf"}Returns:匹配的文件列表"""conditions = []params = []if "name" in query_dict:conditions.append("name LIKE ?")params.append(f"%{query_dict['name']}%")if "extension" in query_dict:conditions.append("extension = ?")params.append(query_dict['extension'].lower())if "min_size" in query_dict and "max_size" in query_dict:conditions.append("size BETWEEN ? AND ?")params.append(query_dict['min_size'])params.append(query_dict['max_size'])if "start_date" in query_dict and "end_date" in query_dict:date_type = query_dict.get("date_type", "modified_time")conditions.append(f"{date_type} BETWEEN ? AND ?")params.append(query_dict['start_date'].isoformat())params.append(query_dict['end_date'].isoformat())if "mime_type" in query_dict:conditions.append("mime_type LIKE ?")params.append(f"%{query_dict['mime_type']}%")# 构建SQL查询sql = "SELECT * FROM files"if conditions:sql += " WHERE " + " AND ".join(conditions)self.cursor.execute(sql, params)return self.cursor.fetchall()def close(self):"""关闭数据库连接"""if self.conn:self.conn.close()

全文检索实现

元数据搜索虽然有用,但对于文本文件,我们还需要实现全文检索功能,使用户能够搜索文件内容。为此,我们可以使用Whoosh库,它是一个纯Python实现的全文搜索引擎。

安装Whoosh

pip install whoosh

实现全文索引和搜索

import os
from whoosh.index import create_in, open_dir
from whoosh.fields import Schema, TEXT, ID, DATETIME, NUMERIC
from whoosh.qparser import QueryParser, MultifieldParser
import whoosh.index as index
import datetimeclass FullTextSearcher:def __init__(self, index_dir="fulltext_index"):"""初始化全文搜索器Args:index_dir: 索引目录"""self.index_dir = index_dirself._init_index()def _init_index(self):"""初始化索引"""# 创建索引目录if not os.path.exists(self.index_dir):os.makedirs(self.index_dir)# 定义文档模式self.schema = Schema(path=ID(stored=True, unique=True),content=TEXT(stored=True),name=TEXT(stored=True),extension=TEXT(stored=True),size=NUMERIC(stored=True),modified_time=DATETIME(stored=True))# 创建或打开索引if not index.exists_in(self.index_dir):self.ix = create_in(self.index_dir, self.schema)else:self.ix = open_dir(self.index_dir)def _extract_text_content(self, file_path):"""提取文件的文本内容Args:file_path: 文件路径Returns:文件的文本内容"""try:# 这里只处理简单的文本文件,实际应用中需要处理更多文件类型with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:return f.read()except Exception as e:print(f"读取文件 {file_path} 内容时出错: {e}")return ""def index_file(self, file_path):"""索引单个文件Args:file_path: 文件路径"""try:# 提取文件元数据metadata = extract_file_metadata(file_path)if not metadata:return# 提取文件内容content = self._extract_text_content(file_path)# 添加到索引writer = self.ix.writer()writer.add_document(path=file_path,content=content,name=metadata['name'],extension=metadata['extension'],size=metadata['size'],modified_time=metadata['modified_time'])writer.commit()except Exception as e:print(f"索引文件 {file_path} 时出错: {e}")def index_directory(self, directory):"""索引目录中的所有文本文件Args:directory: 目录路径"""start_time = time.time()# 遍历目录file_list = traverse_directory(directory)# 过滤出文本文件text_extensions = ['.txt', '.md', '.py', '.java', '.c', '.cpp', '.h', '.html', '.css', '.js', '.json', '.xml']text_files = [f for f in file_list if os.path.splitext(f)[1].lower() in text_extensions]# 索引每个文件writer = self.ix.writer()indexed_count = 0for file_path in text_files:try:# 提取文件元数据metadata = extract_file_metadata(file_path)if not metadata:continue# 提取文件内容content = self._extract_text_content(file_path)# 添加到索引writer.add_document(path=file_path,content=content,name=metadata['name'],extension=metadata['extension'],size=metadata['size'],modified_time=metadata['modified_time'])indexed_count += 1except Exception as e:print(f"索引文件 {file_path} 时出错: {e}")# 提交所有更改writer.commit()end_time = time.time()print(f"全文索引完成,共索引了 {indexed_count} 个文本文件,耗时 {end_time - start_time:.2f} 秒")def search(self, query_text, field="content"):"""搜索文件内容Args:query_text: 查询文本field: 要搜索的字段,默认为"content"Returns:匹配的文件列表"""results = []with self.ix.searcher() as searcher:query = QueryParser(field, self.ix.schema).parse(query_text)search_results = searcher.search(query, limit=None)for result in search_results:results.append({'path': result['path'],'name': result['name'],'extension': result['extension'],'size': result['size'],'modified_time': result['modified_time'],'content_snippet': result.highlights("content")})return resultsdef multi_field_search(self, query_text, fields=["content", "name"]):"""在多个字段中搜索Args:query_text: 查询文本fields: 要搜索的字段列表Returns:匹配的文件列表"""results = []with self.ix.searcher() as searcher:query = MultifieldParser(fields, self.ix.schema).parse(query_text)search_results = searcher.search(query, limit=None)for result in search_results:results.append({'path': result['path'],'name': result['name'],'extension': result['extension'],'size': result['size'],'modified_time': result['modified_time'],'content_snippet': result.highlights("content") if "content" in fields else ""})return results

机器学习优化搜索结果

机器学习可以帮助我们根据用户的搜索行为和偏好来优化搜索结果的排序,提供更加个性化的搜索体验。

基于用户行为的排序优化

import numpy as np
from sklearn.ensemble import RandomForestClassifierclass SearchRanker:def __init__(self):"""初始化搜索结果排序器"""self.model = RandomForestClassifier(n_estimators=100)self.trained = Falsedef extract_features(self, query, document):"""提取查询和文档的特征Args:query: 查询文本document: 文档信息Returns:特征向量"""# 这里是一个简化的特征提取示例# 实际应用中可以提取更多特征features = []# 1. 文档类型特征ext = document.get('extension', '').lower()is_text = 1 if ext in ['.txt', '.md', '.py', '.java'] else 0is_doc = 1 if ext in ['.doc', '.docx', '.pdf'] else 0is_image = 1 if ext in ['.jpg', '.png', '.gif'] else 0features.extend([is_text, is_doc, is_image])# 2. 文件大小特征(归一化)size = document.get('size', 0)normalized_size = np.log1p(size) / 20  # 简单归一化features.append(normalized_size)# 3. 文件年龄特征if 'modified_time' in document:age_days = (datetime.datetime.now() - document['modified_time']).daysnormalized_age = np.log1p(age_days) / 10  # 简单归一化features.append(normalized_age)else:features.append(0)# 4. 查询相关特征query_terms = set(query.lower().split())name_terms = set(document.get('name', '').lower().split())# 文件名中包含查询词的比例name_match_ratio = len(query_terms.intersection(name_terms)) / max(len(query_terms), 1)features.append(name_match_ratio)return np.array(features)def train(self, training_data):"""训练排序模型Args:training_data: 训练数据,格式为[(query, document, relevance_score), ...]"""X = []y = []for query, document, relevance in training_data:features = self.extract_features(query, document)X.append(features)y.append(relevance)self.model.fit(X, y)self.trained = Truedef rank(self, query, documents):"""对搜索结果进行排序Args:query: 查询文本documents: 文档列表Returns:排序后的文档列表"""if not self.trained:# 如果模型未训练,按原顺序返回return documents# 提取特征并预测相关性分数scores = []for doc in documents:features = self.extract_features(query, doc)score = self.model.predict_proba([features])[0][1]  # 获取正类的概率scores.append(score)# 按分数排序ranked_docs = [doc for _, doc in sorted(zip(scores, documents), key=lambda x: x[0], reverse=True)]return ranked_docs

协同过滤推荐

除了基于内容的搜索,我们还可以使用协同过滤来推荐文件,基于用户之间的相似性。

import numpy as np
from scipy.sparse import csr_matrix
from sklearn.metrics.pairwise import cosine_similarityclass CollaborativeFilterRecommender:def __init__(self):"""初始化协同过滤推荐器"""self.user_item_matrix = Noneself.users = []self.items = []self.user_to_index = {}self.item_to_index = {}def fit(self, user_item_interactions):"""训练协同过滤模型Args:user_item_interactions: 用户-项目交互列表,格式为[(user_id, item_id, rating), ...]"""# 提取所有用户和项目users = set()items = set()for user, item, _ in user_item_interactions:users.add(user)items.add(item)self.users = list(users)self.items = list(items)# 创建映射self.user_to_index = {user: i for i, user in enumerate(self.users)}self.item_to_index = {item: i for i, item in enumerate(self.items)}# 创建用户-项目矩阵n_users = len(self.users)n_items = len(self.items)# 使用稀疏矩阵存储data = []row_ind = []col_ind = []for user, item, rating in user_item_interactions:user_idx = self.user_to_index[user]item_idx = self.item_to_index[item]data.append(rating)row_ind.append(user_idx)col_ind.append(item_idx)self.user_item_matrix = csr_matrix((data, (row_ind, col_ind)), shape=(n_users, n_items))def recommend_items(self, user_id, top_n=5):"""为用户推荐项目Args:user_id: 用户IDtop_n: 推荐的项目数量Returns:推荐的项目列表"""if user_id not in self.user_to_index:return []user_idx = self.user_to_index[user_id]# 计算用户相似度user_similarities = cosine_similarity(self.user_item_matrix[user_idx], self.user_item_matrix)user_similarities = user_similarities.flatten()# 找到最相似的用户(排除自己)similar_users = np.argsort(user_similarities)[::-1][1:6]  # 取前5个最相似的用户# 获取相似用户喜欢但当前用户未交互的项目user_items = set(self.user_item_matrix[user_idx].nonzero()[1])recommendations = {}for similar_user in similar_users:similar_user_items = self.user_item_matrix[similar_user].nonzero()[1]for item in similar_user_items:if item not in user_items:# 使用相似度作为权重weight = user_similarities[similar_user]if item in recommendations:recommendations[item] += weightelse:recommendations[item] = weight# 排序并返回推荐项目recommended_items = sorted(recommendations.items(), key=lambda x: x[1], reverse=True)[:top_n]return [self.items[item] for item, _ in recommended_items]

分布式搜索系统

对于大规模文件系统,单机搜索可能无法满足性能需求。我们可以实现一个简单的分布式搜索系统,将索引和搜索任务分散到多台机器上。

使用Celery实现分布式任务

from celery import Celery
import os
import json# 创建Celery实例
app = Celery('file_search', broker='redis://localhost:6379/0')@app.task
def index_directory(directory, index_id):"""索引目录(作为Celery任务)Args:directory: 目录路径index_id: 索引ID"""# 创建索引目录index_dir = f"index_{index_id}"if not os.path.exists(index_dir):os.makedirs(index_dir)# 创建索引器indexer = FileIndexer(f"{index_dir}/metadata.db")text_searcher = FullTextSearcher(f"{index_dir}/fulltext")# 遍历目录file_list = traverse_directory(directory)# 索引文件for file_path in file_list:# 提取元数据metadata = extract_file_metadata(file_path)if metadata:indexer.add_file(metadata)# 对文本文件进行全文索引if is_text_file(file_path):text_searcher.index_file(file_path)return {"index_id": index_id,"directory": directory,"file_count": len(file_list)}@app.task
def search_index(index_id, query, query_type="metadata"):"""搜索索引(作为Celery任务)Args:index_id: 索引IDquery: 查询字符串query_type: 查询类型,可以是"metadata"或"fulltext""""index_dir = f"index_{index_id}"if query_type == "metadata":# 元数据搜索indexer = FileIndexer(f"{index_dir}/metadata.db")results = indexer.search_by_name(query)else:# 全文搜索text_searcher = FullTextSearcher(f"{index_dir}/fulltext")results = text_searcher.search(query)return {"index_id": index_id,"query": query,"query_type": query_type,"results": results}def is_text_file(file_path):"""判断文件是否为文本文件Args:file_path: 文件路径Returns:是否为文本文件"""text_extensions = ['.txt', '.md', '.py', '.java', '.c', '.cpp', '.h', '.html', '.css', '.js', '.json', '.xml']return os.path.splitext(file_path)[1].lower() in text_extensions

分布式索引管理器

class DistributedIndexManager:def __init__(self, redis_url='redis://localhost:6379/0'):"""初始化分布式索引管理器Args:redis_url: Redis服务器URL"""import redisself.redis = redis.from_url(redis_url)def create_index_task(self, directory):"""创建索引任务Args:directory: 要索引的目录Returns:索引ID"""# 生成唯一索引IDimport uuidindex_id = str(uuid.uuid4())# 存储索引信息index_info = {"id": index_id,"directory": directory,"status": "pending","created_at": datetime.datetime.now().isoformat()}self.redis.set(f"index:{index_id}", json.dumps(index_info))# 创建索引任务index_directory.delay(directory, index_id)return index_iddef get_index_status(self, index_id):"""获取索引状态Args:index_id: 索引IDReturns:索引状态信息"""index_info = self.redis.get(f"index:{index_id}")if index_info:return json.loads(index_info)return Nonedef search(self, query, query_type="metadata"):"""在所有索引中搜索Args:query: 查询字符串query_type: 查询类型Returns:搜索结果"""# 获取所有索引index_keys = self.redis.keys("index:*")index_ids = [key.decode().split(":")[1] for key in index_keys]# 创建搜索任务search_tasks = [search_index.delay(index_id, query, query_type) for index_id in index_ids]# 收集结果all_results = []for task in search_tasks:result = task.get(timeout=30)  # 等待最多30秒if result and "results" in result:all_results.extend(result["results"])return all_results

完整项目实现

现在,我们将前面介绍的各种组件整合到一个完整的智能文件搜索系统中。

系统架构

我们的智能文件搜索系统包括以下组件:

  1. 文件爬虫:遍历文件系统,收集文件信息
  2. 索引器:建立文件元数据和内容的索引
  3. 搜索引擎:处理用户查询,返回相关结果
  4. 用户界面:提供交互界面,展示搜索结果

核心类实现

import os
import time
import datetime
import sqlite3
import json
import threading
import logging
from pathlib import Path
from whoosh.index import create_in, open_dir
from whoosh.fields import Schema, TEXT, ID, DATETIME, NUMERIC
from whoosh.qparser import QueryParser, MultifieldParser
import whoosh.index as index# 配置日志
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('IntelligentFileSearch')class FileSearchSystem:def __init__(self, index_dir="search_index"):"""初始化文件搜索系统Args:index_dir: 索引目录"""self.index_dir = index_dir# 创建索引目录if not os.path.exists(index_dir):os.makedirs(index_dir)# 初始化组件self.metadata_indexer = FileIndexer(os.path.join(index_dir, "metadata.db"))self.fulltext_searcher = FullTextSearcher(os.path.join(index_dir, "fulltext"))self.semantic_searcher = None  # 延迟初始化# 索引状态self.indexing = Falseself.indexed_files = 0self.total_files = 0def index_directory(self, directory, include_fulltext=True, recursive=True):"""索引目录Args:directory: 要索引的目录include_fulltext: 是否包含全文索引recursive: 是否递归索引子目录"""if self.indexing:logger.warning("已有索引任务正在进行")return Falseself.indexing = Trueself.indexed_files = 0# 在后台线程中执行索引thread = threading.Thread(target=self._index_directory_thread,args=(directory, include_fulltext, recursive))thread.daemon = Truethread.start()return Truedef _index_directory_thread(self, directory, include_fulltext, recursive):"""后台索引线程"""try:logger.info(f"开始索引目录: {directory}")start_time = time.time()# 遍历文件if recursive:file_list = traverse_directory(directory)else:file_list = [os.path.join(directory, f) for f in os.listdir(directory) if os.path.isfile(os.path.join(directory, f))]self.total_files = len(file_list)logger.info(f"找到 {self.total_files} 个文件")# 索引文件for file_path in file_list:try:# 提取元数据metadata = extract_file_metadata(file_path)if metadata:self.metadata_indexer.add_file(metadata)# 全文索引if include_fulltext and is_text_file(file_path):self.fulltext_searcher.index_file(file_path)self.indexed_files += 1if self.indexed_files % 100 == 0:logger.info(f"已索引 {self.indexed_files}/{self.total_files} 个文件")except Exception as e:logger.error(f"索引文件 {file_path} 时出错: {e}")end_time = time.time()logger.info(f"索引完成,共索引了 {self.indexed_files} 个文件,耗时 {end_time - start_time:.2f} 秒")except Exception as e:logger.error(f"索引过程中出错: {e}")finally:self.indexing = Falsedef get_indexing_status(self):"""获取索引状态Returns:索引状态信息"""return {"indexing": self.indexing,"indexed_files": self.indexed_files,"total_files": self.total_files,"progress": self.indexed_files / max(self.total_files, 1)}def search(self, query, search_type="all", max_results=100):"""搜索文件Args:query: 查询字符串search_type: 搜索类型,可以是"metadata", "fulltext", "semantic"或"all"max_results: 最大结果数Returns:搜索结果列表"""results = []if search_type in ["metadata", "all"]:# 元数据搜索metadata_results = self.metadata_indexer.search_by_name(query)results.extend([{"path": r[1],"name": r[2],"extension": r[3],"size": r[4],"modified_time": r[6],"type": "metadata","score": 0.5  # 默认分数}for r in metadata_results[:max_results]])if search_type in ["fulltext", "all"]:# 全文搜索fulltext_results = self.fulltext_searcher.search(query)results.extend([{"path": r["path"],"name": r["name"],"extension": r["extension"],"size": r["size"],"modified_time": r["modified_time"],"content_snippet": r["content_snippet"],"type": "fulltext","score": 0.7  # 默认分数}for r in fulltext_results[:max_results]])if search_type in ["semantic", "all"] and query.strip():# 语义搜索(如果查询不为空)if not self.semantic_searcher:try:# 延迟初始化语义搜索器from sentence_transformers import SentenceTransformerself.semantic_searcher = AdvancedSemanticSearcher()# 从全文索引中提取文档documents = []document_paths = []with self.fulltext_searcher.ix.searcher() as searcher:for doc in searcher.documents():documents.append(doc.get("content", ""))document_paths.append(doc.get("path", ""))if documents:self.semantic_searcher.add_documents(documents)self.document_paths = document_pathsexcept ImportError:logger.warning("无法初始化语义搜索器,请安装sentence-transformers库")self.semantic_searcher = Noneif self.semantic_searcher and hasattr(self.semantic_searcher, 'document_embeddings'):semantic_results = self.semantic_searcher.search(query, top_n=max_results)for idx, score, _ in semantic_results:if 0 <= idx < len(self.document_paths):path = self.document_paths[idx]# 获取文件元数据metadata = extract_file_metadata(path)if metadata:results.append({"path": path,"name": metadata["name"],"extension": metadata["extension"],"size": metadata["size"],"modified_time": metadata["modified_time"],"type": "semantic","score": float(score)})# 去重(按路径)unique_results = {}for r in results:path = r["path"]if path not in unique_results or r["score"] > unique_results[path]["score"]:unique_results[path] = r# 排序(按分数降序)sorted_results = sorted(unique_results.values(), key=lambda x: x["score"], reverse=True)return sorted_results[:max_results]def close(self):"""关闭搜索系统,释放资源"""self.metadata_indexer.close()# 其他资源清理...def is_text_file(file_path):"""判断文件是否为文本文件"""text_extensions = ['.txt', '.md', '.py', '.java', '.c', '.cpp', '.h', '.html', '.css', '.js', '.json', '.xml']return os.path.splitext(file_path)[1].lower() in text_extensions

命令行界面

import argparse
import sys
import timedef main():parser = argparse.ArgumentParser(description='智能文件搜索系统')subparsers = parser.add_subparsers(dest='command', help='命令')# 索引命令index_parser = subparsers.add_parser('index', help='索引目录')index_parser.add_argument('directory', help='要索引的目录路径')index_parser.add_argument('--no-fulltext', action='store_true', help='不包含全文索引')index_parser.add_argument('--no-recursive', action='store_true', help='不递归索引子目录')# 搜索命令search_parser = subparsers.add_parser('search', help='搜索文件')search_parser.add_argument('query', help='搜索查询')search_parser.add_argument('--type', choices=['metadata', 'fulltext', 'semantic', 'all'], default='all', help='搜索类型')search_parser.add_argument('--max', type=int, default=20, help='最大结果数')# 解析参数args = parser.parse_args()# 创建搜索系统search_system = FileSearchSystem()if args.command == 'index':# 索引目录include_fulltext = not args.no_fulltextrecursive = not args.no_recursiveprint(f"开始索引目录: {args.directory}")print(f"包含全文索引: {include_fulltext}")print(f"递归索引: {recursive}")search_system.index_directory(args.directory, include_fulltext, recursive)# 等待索引完成while search_system.indexing:status = search_system.get_indexing_status()progress = status["progress"] * 100print(f"\r索引进度: {status['indexed_files']}/{status['total_files']} ({progress:.1f}%)", end="")time.sleep(0.5)print("\n索引完成!")elif args.command == 'search':# 搜索文件print(f"搜索: {args.query}")print(f"搜索类型: {args.type}")results = search_system.search(args.query, args.type, args.max)if not results:print("未找到匹配的文件")else:print(f"找到 {len(results)} 个匹配的文件:")for i, result in enumerate(results, 1):print(f"\n{i}. {result['name']} ({result['path']})")print(f"   类型: {result['extension']}, 大小: {format_size(result['size'])}")print(f"   修改时间: {result['modified_time']}")if 'content_snippet' in result:print(f"   内容片段: {result['content_snippet']}")else:parser.print_help()# 关闭搜索系统search_system.close()def format_size(size_bytes):"""格式化文件大小"""if size_bytes < 1024:return f"{size_bytes} B"elif size_bytes < 1024 * 1024:return f"{size_bytes / 1024:.1f} KB"elif size_bytes < 1024 * 1024 * 1024:return f"{size_bytes / (1024 * 1024):.1f} MB"else:return f"{size_bytes / (1024 * 1024 * 1024):.1f} GB"if __name__ == "__main__":main()

Web界面

为了提供更友好的用户体验,我们可以使用Flask实现一个简单的Web界面。

from flask import Flask, render_template, request, jsonify
import os
import threadingapp = Flask(__name__)# 创建搜索系统实例
search_system = FileSearchSystem()@app.route('/')
def index():"""首页"""return render_template('index.html')@app.route('/api/index', methods=['POST'])
def api_index():"""索引API"""data = request.jsondirectory = data.get('directory')include_fulltext = data.get('include_fulltext', True)recursive = data.get('recursive', True)if not directory or not os.path.exists(directory):return jsonify({"error": "目录不存在"}), 400success = search_system.index_directory(directory, include_fulltext, recursive)if success:return jsonify({"status": "indexing"})else:return jsonify({"error": "已有索引任务正在进行"}), 400@app.route('/api/index/status')
def api_index_status():"""索引状态API"""return jsonify(search_system.get_indexing_status())@app.route('/api/search')
def api_search():"""搜索API"""query = request.args.get('query', '')search_type = request.args.get('type', 'all')max_results = int(request.args.get('max', 100))if not query:return jsonify({"results": []})results = search_system.search(query, search_type, max_results)# 转换日期时间对象为字符串for result in results:if isinstance(result.get('modified_time'), datetime.datetime):result['modified_time'] = result['modified_time'].isoformat()return jsonify({"results": results})if __name__ == '__main__':app.run(debug=True)

性能评估与优化

为了评估我们的智能文件搜索系统的性能,我们可以进行一系列的测试和优化。

性能测试

import time
import random
import os
import shutil
import matplotlib.pyplot as plt
import numpy as npdef generate_test_files(directory, num_files, file_size_range=(1024, 1024*1024)):"""生成测试文件Args:directory: 目录路径num_files: 文件数量file_size_range: 文件大小范围(字节)"""if not os.path.exists(directory):os.makedirs(directory)for i in range(num_files):file_path = os.path.join(directory, f"test_file_{i}.txt")file_size = random.randint(*file_size_range)with open(file_path, 'w') as f:# 生成随机文本words = ["lorem", "ipsum", "dolor", "sit", "amet", "consectetur", "adipiscing", "elit"]text = " ".join(random.choices(words, k
# Python实现智能文件搜索系统:从基础到高级应用在当今数字时代,随着数据量的爆炸性增长,高效地搜索和定位文件变得越来越重要。无论是个人用户管理自己的文档,还是企业处理海量数据,一个智能的文件搜索系统都能显著提高工作效率。本文将深入探讨如何使用Python实现一个智能文件搜索系统,从基础的文件遍历到高级的内容索引、自然语言处理和机器学习技术,全面展示智能文件搜索的实现过程与优化策略。## 目录1. [智能文件搜索概述](#智能文件搜索概述)
2. [基础文件系统操作](#基础文件系统操作)
3. [文件元数据提取与索引](#文件元数据提取与索引)
4. [全文检索实现](#全文检索实现)
5. [自然语言处理增强搜索](#自然语言处理增强搜索)
6. [机器学习优化搜索结果](#机器学习优化搜索结果)
7. [分布式搜索系统](#分布式搜索系统)
8. [完整项目实现](#完整项目实现)
9. [性能评估与优化](#性能评估与优化)
10. [总结与展望](#总结与展望)## 智能文件搜索概述智能文件搜索系统不同于传统的文件查找工具,它不仅能够基于文件名进行搜索,还能分析文件内容、元数据,甚至理解用户的搜索意图,提供更加精准和个性化的搜索结果。一个完善的智能文件搜索系统通常包括以下核心功能:1. **基础文件遍历**:能够高效地遍历文件系统,识别各种类型的文件
2. **元数据提取**:从文件中提取创建时间、修改时间、大小、类型等元数据
3. **内容索引**:建立文件内容的索引,支持全文检索
4. **语义理解**:理解文件内容和用户查询的语义,支持模糊匹配和相关性搜索
5. **个性化推荐**:根据用户的搜索历史和行为模式,提供个性化的搜索结果
6. **实时更新**:能够检测文件系统的变化,实时更新索引在本文中,我们将逐步实现这些功能,构建一个功能完善的智能文件搜索系统。## 基础文件系统操作首先,我们需要实现基础的文件系统遍历功能,这是智能文件搜索系统的基础。Python提供了强大的文件系统操作库,如`os`、`pathlib`和`glob`,可以帮助我们高效地遍历文件系统。### 使用os.walk遍历文件系统```python
import os
import timedef traverse_directory(root_dir):"""遍历指定目录下的所有文件Args:root_dir: 要遍历的根目录Returns:文件路径列表"""file_list = []start_time = time.time()for root, dirs, files in os.walk(root_dir):for file in files:file_path = os.path.join(root, file)file_list.append(file_path)end_time = time.time()print(f"遍历完成,共找到 {len(file_list)} 个文件,耗时 {end_time - start_time:.2f} 秒")return file_list# 示例使用
if __name__ == "__main__":files = traverse_directory("/path/to/directory")for file in files[:10]:  # 打印前10个文件print(file)

使用pathlib实现更现代的文件遍历

from pathlib import Path
import timedef traverse_directory_pathlib(root_dir):"""使用pathlib遍历指定目录下的所有文件Args:root_dir: 要遍历的根目录Returns:文件路径列表"""root_path = Path(root_dir)start_time = time.time()# 递归遍历所有文件file_list = list(root_path.rglob('*'))# 只保留文件,排除目录file_list = [str(f) for f in file_list if f.is_file()]end_time = time.time()print(f"遍历完成,共找到 {len(file_list)} 个文件,耗时 {end_time - start_time:.2f} 秒")return file_list

使用glob进行模式匹配

import glob
import timedef find_files_by_pattern(root_dir, pattern="*"):"""使用glob根据模式查找文件Args:root_dir: 要搜索的根目录pattern: 文件模式,如 "*.txt", "*.py" 等Returns:匹配的文件列表"""start_time = time.time()# 构建搜索路径search_path = os.path.join(root_dir, "**", pattern)# 递归搜索file_list = glob.glob(search_path, recursive=True)end_time = time.time()print(f"搜索完成,共找到 {len(file_list)} 个匹配的文件,耗时 {end_time - start_time:.2f} 秒")return file_list

并行文件遍历

对于大型文件系统,串行遍历可能会非常耗时。我们可以使用Python的并行处理库,如concurrent.futuresmultiprocessing,来加速文件遍历过程。

import os
import time
import concurrent.futuresdef traverse_subdirectory(directory):"""遍历单个子目录"""file_list = []for root, _, files in os.walk(directory):for file in files:file_path = os.path.join(root, file)file_list.append(file_path)return file_listdef parallel_traverse_directory(root_dir, max_workers=None):"""并行遍历文件系统Args:root_dir: 要遍历的根目录max_workers: 最大工作线程数,默认为None(由系统决定)Returns:文件路径列表"""start_time = time.time()# 获取顶层目录top_level_dirs = [os.path.join(root_dir, d) for d in os.listdir(root_dir) if os.path.isdir(os.path.join(root_dir, d))]# 如果顶层目录太少,添加根目录本身if len(top_level_dirs) < 2:top_level_dirs.append(root_dir)# 并行遍历每个顶层目录all_files = []with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor:future_to_dir = {executor.submit(traverse_subdirectory, d): d for d in top_level_dirs}for future in concurrent.futures.as_completed(future_to_dir):dir_path = future_to_dir[future]try:files = future.result()all_files.extend(files)except Exception as e:print(f"遍历目录 {dir_path} 时出错: {e}")end_time = time.time()print(f"并行遍历完成,共找到 {len(all_files)} 个文件,耗时 {end_time - start_time:.2f} 秒")return all_files

文件元数据提取与索引

在实现了基础的文件遍历功能后,下一步是提取文件的元数据并建立索引。文件元数据包括文件名、路径、大小、创建时间、修改时间、文件类型等信息,这些信息对于文件搜索非常有用。

提取文件元数据

import os
import time
import datetime
import mimetypesdef extract_file_metadata(file_path):"""提取文件的元数据Args:file_path: 文件路径Returns:包含文件元数据的字典"""try:# 获取基本文件信息file_stat = os.stat(file_path)file_name = os.path.basename(file_path)file_ext = os.path.splitext(file_name)[1].lower()# 猜测MIME类型mime_type, _ = mimetypes.guess_type(file_path)# 构建元数据字典metadata = {'path': file_path,'name': file_name,'extension': file_ext,'size': file_stat.st_size,  # 文件大小(字节)'created_time': datetime.datetime.fromtimestamp(file_stat.st_ctime),'modified_time': datetime.datetime.fromtimestamp(file_stat.st_mtime),'accessed_time': datetime.datetime.fromtimestamp(file_stat.st_atime),'mime_type': mime_type or 'unknown'}return metadataexcept Exception as e:print(f"提取文件 {file_path} 的元数据时出错: {e}")return None

建立元数据索引

为了高效地搜索文件,我们需要建立元数据索引。这里我们使用SQLite数据库来存储和索引文件元数据,因为它轻量级且易于使用。

import sqlite3
import jsonclass FileIndexer:def __init__(self, db_path="file_index.db"):"""初始化文件索引器Args:db_path: SQLite数据库文件路径"""self.db_path = db_pathself.conn = Noneself.cursor = Noneself._init_database()def _init_database(self):"""初始化数据库连接和表结构"""self.conn = sqlite3.connect(self.db_path)self.cursor = self.conn.cursor()# 创建文件元数据表self.cursor.execute('''CREATE TABLE IF NOT EXISTS files (id INTEGER PRIMARY KEY AUTOINCREMENT,path TEXT UNIQUE,name TEXT,extension TEXT,size INTEGER,created_time TIMESTAMP,modified_time TIMESTAMP,accessed_time TIMESTAMP,mime_type TEXT,metadata_json TEXT,indexed_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''')# 创建索引以加速搜索self.cursor.execute('CREATE INDEX IF NOT EXISTS idx_path ON files (path)')self.cursor.execute('CREATE INDEX IF NOT EXISTS idx_name ON files (name)')self.cursor.execute('CREATE INDEX IF NOT EXISTS idx_extension ON files (extension)')self.cursor.execute('CREATE INDEX IF NOT EXISTS idx_modified ON files (modified_time)')self.conn.commit()def add_file(self, metadata):"""将文件元数据添加到索引Args:metadata: 文件元数据字典"""if not metadata:returntry:# 将额外的元数据转换为JSONmetadata_json = json.dumps(metadata)# 插入或更新文件记录self.cursor.execute('''INSERT OR REPLACE INTO files (path, name, extension, size, created_time, modified_time, accessed_time, mime_type, metadata_json)VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)''', (metadata['path'],metadata['name'],metadata['extension'],metadata['size'],metadata['created_time'].isoformat(),metadata['modified_time'].isoformat(),metadata['accessed_time'].isoformat(),metadata['mime_type'],metadata_json))self.conn.commit()except Exception as e:print(f"添加文件 {metadata['path']} 到索引时出错: {e}")def index_directory(self, directory):"""索引指定目录下的所有文件Args:directory: 要索引的目录路径"""start_time = time.time()# 遍历目录file_list = traverse_directory(directory)# 提取并索引每个文件的元数据for file_path in file_list:metadata = extract_file_metadata(file_path)if metadata:self.add_file(metadata)end_time = time.time()print(f"索引完成,共索引了 {len(file_list)} 个文件,耗时 {end_time - start_time:.2f} 秒")def search_by_name(self, name_pattern):"""按文件名搜索Args:name_pattern: 文件名模式,支持SQL LIKE语法Returns:匹配的文件列表"""self.cursor.execute("SELECT * FROM files WHERE name LIKE ?", (f"%{name_pattern}%",))return self.cursor.fetchall()def search_by_extension(self, extension):"""按文件扩展名搜索Args:extension: 文件扩展名,如 ".txt", ".py"Returns:匹配的文件列表"""self.cursor.execute("SELECT * FROM files WHERE extension = ?", (extension.lower(),))return self.cursor.fetchall()def search_by_date_range(self, start_date, end_date, date_type="modified_time"):"""按日期范围搜索Args:start_date: 开始日期(datetime对象)end_date: 结束日期(datetime对象)date_type: 日期类型,可以是 "created_time", "modified_time" 或 "accessed_time"Returns:匹配的文件列表"""self.cursor.execute(f"SELECT * FROM files WHERE {date_type} BETWEEN ? AND ?", (start_date.isoformat(), end_date.isoformat()))return self.cursor.fetchall()def search_by_size_range(self, min_size, max_size):"""按文件大小范围搜索Args:min_size: 最小文件大小(字节)max_size: 最大文件大小(字节)Returns:匹配的文件列表"""self.cursor.execute("SELECT * FROM files WHERE size BETWEEN ? AND ?", (min_size, max_size))return self.cursor.fetchall()def search_by_mime_type(self, mime_type):"""按MIME类型搜索Args:mime_type: MIME类型,如 "text/plain", "image/jpeg"Returns:匹配的文件列表"""self.cursor.execute("SELECT * FROM files WHERE mime_type LIKE ?", (f"%{mime_type}%",))return self.cursor.fetchall()def advanced_search(self, query_dict):"""高级搜索,支持多条件组合Args:query_dict: 查询条件字典,如 {"name": "report", "extension": ".pdf"}Returns:匹配的文件列表"""conditions = []params = []if "name" in query_dict:conditions.append("name LIKE ?")params.append(f"%{query_dict['name']}%")if "extension" in query_dict:conditions.append("extension = ?")params.append(query_dict['extension'].lower())if "min_size" in query_dict and "max_size" in query_dict:conditions.append("size BETWEEN ? AND ?")params.append(query_dict['min_size'])params.append(query_dict['max_size'])if "start_date" in query_dict and "end_date" in query_dict:date_type = query_dict.get("date_type", "modified_time")conditions.append(f"{date_type} BETWEEN ? AND ?")params.append(query_dict['start_date'].isoformat())params.append(query_dict['end_date'].isoformat())if "mime_type" in query_dict:conditions.append("mime_type LIKE ?")params.append(f"%{query_dict['mime_type']}%")# 构建SQL查询sql = "SELECT * FROM files"if conditions:sql += " WHERE " + " AND ".join(conditions)self.cursor.execute(sql, params)return self.cursor.fetchall()def close(self):"""关闭数据库连接"""if self.conn:self.conn.close()

全文检索实现

元数据搜索虽然有用,但对于文本文件,我们还需要实现全文检索功能,使用户能够搜索文件内容。为此,我们可以使用Whoosh库,它是一个纯Python实现的全文搜索引擎。

安装Whoosh

pip install whoosh

实现全文索引和搜索

import os
from whoosh.index import create_in, open_dir
from whoosh.fields import Schema, TEXT, ID, DATETIME, NUMERIC
from whoosh.qparser import QueryParser, MultifieldParser
import whoosh.index as index
import datetimeclass FullTextSearcher:def __init__(self, index_dir="fulltext_index"):"""初始化全文搜索器Args:index_dir: 索引目录"""self.index_dir = index_dirself._init_index()def _init_index(self):"""初始化索引"""# 创建索引目录if not os.path.exists(self.index_dir):os.makedirs(self.index_dir)# 定义文档模式self.schema = Schema(path=ID(stored=True, unique=True),content=TEXT(stored=True),name=TEXT(stored=True),extension=TEXT(stored=True),size=NUMERIC(stored=True),modified_time=DATETIME(stored=True))# 创建或打开索引if not index.exists_in(self.index_dir):self.ix = create_in(self.index_dir, self.schema)else:self.ix = open_dir(self.index_dir)def _extract_text_content(self, file_path):"""提取文件的文本内容Args:file_path: 文件路径Returns:文件的文本内容"""try:# 这里只处理简单的文本文件,实际应用中需要处理更多文件类型with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:return f.read()except Exception as e:print(f"读取文件 {file_path} 内容时出错: {e}")return ""def index_file(self, file_path):"""索引单个文件Args:file_path: 文件路径"""try:# 提取文件元数据metadata = extract_file_metadata(file_path)if not metadata:return# 提取文件内容content = self._extract_text_content(file_path)# 添加到索引writer = self.ix.writer()writer.add_document(path=file_path,content=content,name=metadata['name'],extension=metadata['extension'],size=metadata['size'],modified_time=metadata['modified_time'])writer.commit()except Exception as e:print(f"索引文件 {file_path} 时出错: {e}")def index_directory(self, directory):"""索引目录中的所有文本文件Args:directory: 目录路径"""start_time = time.time()# 遍历目录file_list = traverse_directory(directory)# 过滤出文本文件text_extensions = ['.txt', '.md', '.py', '.java', '.c', '.cpp', '.h', '.html', '.css', '.js', '.json', '.xml']text_files = [f for f in file_list if os.path.splitext(f)[1].lower() in text_extensions]# 索引每个文件writer = self.ix.writer()indexed_count = 0for file_path in text_files:try:# 提取文件元数据metadata = extract_file_metadata(file_path)if not metadata:continue# 提取文件内容content = self._extract_text_content(file_path)# 添加到索引writer.add_document(path=file_path,content=content,name=metadata['name'],extension=metadata['extension'],size=metadata['size'],modified_time=metadata['modified_time'])indexed_count += 1except Exception as e:print(f"索引文件 {file_path} 时出错: {e}")# 提交所有更改writer.commit()end_time = time.time()print(f"全文索引完成,共索引了 {indexed_count} 个文本文件,耗时 {end_time - start_time:.2f} 秒")def search(self, query_text, field="content"):"""搜索文件内容Args:query_text: 查询文本field: 要搜索的字段,默认为"content"Returns:匹配的文件列表"""results = []with self.ix.searcher() as searcher:query = QueryParser(field, self.ix.schema).parse(query_text)search_results = searcher.search(query, limit=None)for result in search_results:results.append({'path': result['path'],'name': result['name'],'extension': result['extension'],'size': result['size'],'modified_time': result['modified_time'],'content_snippet': result.highlights("content")})return resultsdef multi_field_search(self, query_text, fields=["content", "name"]):"""在多个字段中搜索Args:query_text: 查询文本fields: 要搜索的字段列表Returns:匹配的文件列表"""results = []with self.ix.searcher() as searcher:query = MultifieldParser(fields, self.ix.schema).parse(query_text)search_results = searcher.search(query, limit=None)for result in search_results:results.append({'path': result['path'],'name': result['name'],'extension': result['extension'],'size': result['size'],'modified_time': result['modified_time'],'content_snippet': result.highlights("content") if "content" in fields else ""})return results

处理不同类型的文件

在实际应用中,我们需要处理各种类型的文件,如PDF、Word文档、Excel表格等。这里我们可以使用第三方库来提取这些文件的文本内容。

def extract_text_from_file(file_path):"""从各种类型的文件中提取文本内容Args:file_path: 文件路径Returns:文件的文本内容"""file_ext = os.path.splitext(file_path)[1].lower()try:# 纯文本文件if file_ext in ['.txt', '.md', '.py', '.java', '.c', '.cpp', '.h', '.html', '.css', '.js', '.json', '.xml']:with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:return f.read()# PDF文件elif file_ext == '.pdf':try:import PyPDF2with open(file_path, 'rb') as f:reader = PyPDF2.PdfReader(f)text = ""for page in reader.pages:text += page.extract_text() + "\n"return textexcept ImportError:print("PyPDF2库未安装,无法提取PDF文本")return ""# Word文档elif file_ext in ['.doc', '.docx']:try:import docxdoc = docx.Document(file_path)return "\n".join([para.text for para in doc.paragraphs])except ImportError:print("python-docx库未安装,无法提取Word文档文本")return ""# Excel表格elif file_ext in ['.xls', '.xlsx']:try:import pandas as pddf = pd.read_excel(file_path)return df.to_string()except ImportError:print("pandas库未安装,无法提取Excel表格文本")return ""# 其他类型的文件else:print(f"不支持的文件类型: {file_ext}")return ""except Exception as e:print(f"提取文件 {file_path} 的文本内容时出错: {e}")return ""

自然语言处理增强搜索

为了进一步提高搜索的智能性,我们可以使用自然语言处理(NLP)技术来增强搜索功能。这包括关键词提取、同义词扩展、情感分析等。

安装必要的NLP库

pip install nltk spacy gensim

关键词提取

import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
from collections import Counter# 下载必要的NLTK数据
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('wordnet')def extract_keywords(text, top_n=10):"""从文本中提取关键词Args:text: 输入文本top_n: 返回的关键词数量Returns:关键词列表"""# 分词tokens = word_tokenize(text.lower())# 去除停用词和标点符号stop_words = set(stopwords.words('english'))tokens = [token for token in tokens if token.isalpha() and token not in stop_words]# 词形还原lemmatizer = WordNetLemmatizer()tokens = [lemmatizer.lemmatize(token) for token in tokens]# 计算词频word_freq = Counter(tokens)# 返回频率最高的词return [word for word, _ in word_freq.most_common(top_n)]

同义词扩展

from nltk.corpus import wordnetdef get_synonyms(word):"""获取单词的同义词Args:word: 输入单词Returns:同义词列表"""synonyms = set()for syn in wordnet.synsets(word):for lemma in syn.lemmas():synonyms.add(lemma.name().replace('_', ' '))return list(synonyms)def expand_query(query):"""扩展查询,添加同义词Args:query: 原始查询字符串Returns:扩展后的查询字符串"""# 分词tokens = word_tokenize(query.lower())# 去除停用词stop_words = set(stopwords.words('english'))tokens = [token for token in tokens if token.isalpha() and token not in stop_words]# 为每个词添加同义词expanded_tokens = []for token in tokens:expanded_tokens.append(token)synonyms = get_synonyms(token)[:3]  # 限制同义词数量expanded_tokens.extend(synonyms)# 构建扩展查询expanded_query = " OR ".join(expanded_tokens)return expanded_query### 使用词嵌入进行语义搜索传统的关键词匹配搜索可能会错过语义相关但没有共同关键词的文档。使用词嵌入(Word Embeddings)可以捕捉词语之间的语义关系,实现更智能的搜索。```python
import numpy as np
from gensim.models import Word2Vec
from sklearn.metrics.pairwise import cosine_similarityclass SemanticSearcher:def __init__(self, documents=None):"""初始化语义搜索器Args:documents: 文档列表,每个文档是一个字符串"""self.documents = documents or []self.document_vectors = []self.model = Noneif documents:self.train_model()def preprocess_text(self, text):"""预处理文本Args:text: 输入文本Returns:分词后的列表"""# 分词tokens = word_tokenize(text.lower())# 去除停用词和标点符号stop_words = set(stopwords.words('english'))tokens = [token for token in tokens if token.isalpha() and token not in stop_words]return tokensdef add_document(self, document):"""添加文档Args:document: 文档文本"""self.documents.append(document)def train_model(self, vector_size=100, window=5, min_count=1):"""训练词嵌入模型Args:vector_size: 词向量维度window: 上下文窗口大小min_count: 词语最小出现次数"""# 预处理所有文档processed_docs = [self.preprocess_text(doc) for doc in self.documents]# 训练Word2Vec模型self.model = Word2Vec(sentences=processed_docs,vector_size=vector_size,window=window,min_count=min_count,workers=4)# 计算每个文档的向量表示self.document_vectors = []for doc in processed_docs:doc_vector = self._get_document_vector(doc)self.document_vectors.append(doc_vector)def _get_document_vector(self, tokens):"""计算文档的向量表示Args:tokens: 分词后的文档Returns:文档向量"""# 计算所有词向量的平均值word_vectors = []for token in tokens:if token in self.model.wv:word_vectors.append(self.model.wv[token])if not word_vectors:return np.zeros(self.model.vector_size)return np.mean(word_vectors, axis=0)def search(self, query, top_n=5):"""语义搜索Args:query: 查询文本top_n: 返回的结果数量Returns:相似度最高的文档索引和相似度分数"""# 预处理查询query_tokens = self.preprocess_text(query)# 计算查询向量query_vector = self._get_document_vector(query_tokens)# 计算与每个文档的相似度similarities = []for doc_vector in self.document_vectors:# 计算余弦相似度similarity = cosine_similarity(query_vector.reshape(1, -1),doc_vector.reshape(1, -1))[0][0]similarities.append(similarity)# 获取相似度最高的文档top_indices = np.argsort(similarities)[-top_n:][::-1]results = [(idx, similarities[idx]) for idx in top_indices]return results

使用预训练模型进行语义搜索

对于更高级的语义搜索,我们可以使用预训练的语言模型,如BERT、GPT等。这里我们使用Sentence-BERT,它专门用于生成句子的语义表示。

from sentence_transformers import SentenceTransformerclass AdvancedSemanticSearcher:def __init__(self, model_name='paraphrase-MiniLM-L6-v2'):"""初始化高级语义搜索器Args:model_name: 预训练模型名称"""self.model = SentenceTransformer(model_name)self.documents = []self.document_embeddings = Nonedef add_documents(self, documents):"""添加文档Args:documents: 文档列表"""self.documents.extend(documents)# 计算文档嵌入self.document_embeddings = self.model.encode(self.documents)def search(self, query, top_n=5):"""语义搜索Args:query: 查询文本top_n: 返回的结果数量Returns:相似度最高的文档索引和相似度分数"""# 计算查询嵌入query_embedding = self.model.encode([query])[0]# 计算与每个文档的相似度similarities = cosine_similarity([query_embedding],self.document_embeddings)[0]# 获取相似度最高的文档top_indices = np.argsort(similarities)[-top_n:][::-1]results = [(idx, similarities[idx], self.documents[idx]) for idx in top_indices]return results
http://www.xdnf.cn/news/15761.html

相关文章:

  • 【Oracle】ORACLE OMF说明
  • AUTOSAR进阶图解==>AUTOSAR_SWS_DiagnosticLogAndTrace
  • Redisson RLocalCachedMap 核心参详解
  • kotlin部分常用特性总结
  • Ultralytics代码详细解析(三:engine->trainer.py主框架)
  • LVS——nat模式
  • 电机相关常见名词
  • 如何解决Flink CDC同步时间类型字段8小时时间差的问题,以MySQL为例
  • Redis Sentinel哨兵集群
  • Spring之【AnnotatedBeanDefinitionReader】
  • 针对大规模语言模型的上下文工程技术调研与总结(翻译并摘要)
  • 【C++】入门阶段
  • 基于开放API接口采集的定制开发开源AI智能名片S2B2C商城小程序数据整合与增长策略研究
  • 本地部署开源的 AI 驱动的搜索引擎 Perplexica 并实现外部访问
  • Spring Bean 的作用域(Bean Scope)
  • SpringAI_Chat模型_DeepSeek模型--基础对话
  • 扭蛋机系统开发:打造多元化娱乐生态的新引擎
  • Libevent(3)之使用教程(2)创建事件
  • Spring MVC @RequestParam注解全解析
  • 【Linux】重生之从零开始学习运维之Nginx之server小实践
  • 最新版vscode 连接ubuntu 18.04 保姆级教程
  • 编程实现Word自动排版:从理论到实践的全面指南
  • SurfaceView、TextureView、SurfaceTexture 和 GLSurfaceView
  • 【Android】ListView与RecyclerView的基础使用
  • 【unity游戏开发入门到精通——3D篇】3D光源之——unity使用Lens Flare (SRP) 组件实现太阳耀斑镜头光晕效果
  • C++实现单层时间轮
  • 4644电源管理芯片在微波射频组件中的技术优势与国产化实践
  • Linux驱动学习day24(UART子系统)
  • Ubuntu系统下快速体验iperf3工具(网络性能测试)
  • 嵌入式Linux:什么是线程?