GO学习记录九——数据库触发器的使用+redis缓存策略
一、数据库添加触发器
1.使用的pgsql进行的学习。
2.触发器简单理解就是类似c#中的事件委托,类似观察者模式,在数据增删改时注册事件,在对应的操作中会触发回调函数。
3.触发器属于配置项,只需要在创建数据库表时,配置一次即可,使用go代码或直接操作数据库都可以。
--创建触发器函数
CREATE OR REPLACE FUNCTION table1_notify_func()
RETURNS TRIGGER AS $$
BEGINPERFORM pg_notify('table1_changes',json_build_object('operation', TG_OP,'id', COALESCE(NEW.id, OLD.id),'test1', COALESCE(NEW.test1, OLD.test1),'timestamp', NOW())::TEXT);
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
table1=数据库表名。
table1_changes=频道名称,就是表产生变化,数据库广播出来的主题,不同应用场景叫法不一样。
json_build_object=触发广播时携带的消息内容,json格式。
--创建触发器(PostgreSQL 10 语法)
CREATE TRIGGER table1_notify_trgAFTER INSERT OR UPDATE OR DELETEON table1FOR EACH ROWEXECUTE PROCEDURE table1_notify_func();
table1_notify_trg=触发器名称。
AFTER=在操作执行后触发回调。
INSERT OR UPDATE OR DELETE=在插入、更新、删除操作时。
table1=监听的表名。
FOR EACH ROW=应该是每一行。
table1_notify_func=执行的回调函数,在上方操作中定义的函数。
二、go使用触发器部分的代码
非完整代码。
func main() {//监听配置的表名tableNames := watchedTables//为每个表添加监听var wg sync.WaitGroup// 启动通知监听器wg.Add(len(tableNames))for _, tableName := range tableNames {go ListenForNotifications(ctx, db, &wg, tableName+"_changes", OnChangeGeneric)}
}// OnChangeCallback 定义回调函数类型
// payload: NOTIFY 发送的消息内容(如 "INSERT: id=1")
// timestamp: 收到通知的时间
type OnChangeCallback func(tableName, payload string, timestamp time.Time)// listenForNotifications 使用连接池监听通知
func ListenForNotifications(ctx context.Context, pool *pgxpool.Pool, wg *sync.WaitGroup, channel string, onChange OnChangeCallback) {defer wg.Done()// 从连接池获取一个连接用于监听conn, err := pool.Acquire(ctx)if err != nil {LogError("获取监听连接失败: %v", err)return}defer conn.Release()// 监听频道_, err = conn.Exec(ctx, "LISTEN "+channel)if err != nil {LogError("监听频道失败: %v,%v", channel, err)return}LogInfo("开始监听 %v 频道...", channel)for {select {case <-ctx.Done():LogInfo("停止监听通知")returndefault:// 等待通知,设置超时时间ctxWithTimeout, cancel := context.WithTimeout(ctx, 30*time.Second)notification, err := conn.Conn().WaitForNotification(ctxWithTimeout)cancel()if err != nil {if err == context.DeadlineExceeded {// 超时是正常的,继续等待continue}// 如果返回的错误信息包含 "timeout",则继续if strings.Contains(err.Error(), "timeout") {continue}if ctx.Err() != nil {// 上下文被取消,正常退出return}LogError("等待通知失败: %v", err)time.Sleep(1 * time.Second)continue}if notification != nil {if onChange != nil {onChange(channel, notification.Payload, time.Now())}}}}
}
三、完整代码
main.go
package mainimport ("context""crypto/rand""encoding/base64""encoding/json""fmt""io""log""net/http""os""os/signal""path/filepath""regexp""strconv""strings""sync""syscall""time""github.com/gin-gonic/gin""github.com/go-redis/redis/v8""github.com/jackc/pgx/v5""github.com/jackc/pgx/v5/pgxpool"swaggerFiles "github.com/swaggo/files"ginSwagger "github.com/swaggo/gin-swagger"// 注意:替换为你项目的实际路径// _ "your_project/docs" // docs 包,由 swag 生成// 如果 docs 包在根目录,且 main.go 也在根目录,可以这样导入_ "HTTPServices/docs" // 假设 docs 目录在项目根目录下"HTTPServices/redis_model"
)const (tokenLength = 32 // 令牌长度tokenExpiry = 24 * time.Hour // 令牌有效期
)var (validTokens = make(map[string]time.Time) // 有效令牌tokenMutex sync.Mutex // 令牌锁
)// 要监听的表名列表(手动配置)
var watchedTables = []string{"table1","mytable",
}// 记录redis缓存使用的表名
var redisCacheTables = map[string][]string{}type TokenResponse struct {Token string `json:"token"`Expires time.Time `json:"expires"`
}var (db *pgxpool.Poolctx context.Contextcancel context.CancelFunc
)// 启动函数
func main() {// 创建可取消的 contextctx, cancel = context.WithCancel(context.Background())defer cancel() // 确保释放// 初始化 直接使用文件夹名称作为对象,调用里面的方法if err := redis_model.InitRedis("localhost:6379", "", 0); err != nil {LogError("Redis连接失败:%v", err)} else {LogSuccess("Redis连接成功")}testRedis()// 初始化数据库连接db = InitDB()// 获取所有表名 监听的越多性能越差,建议依据项目实际情况进行配置// tableNames, err := GetTableNames(db)// if err != nil {// log.Fatal("获取表名失败:", err)// }//监听配置的表名tableNames := watchedTables//为每个表添加监听var wg sync.WaitGroup// 启动通知监听器wg.Add(len(tableNames))for _, tableName := range tableNames {go ListenForNotifications(ctx, db, &wg, tableName+"_changes", OnChangeGeneric)}// 注册路由RegisterRouter()// 启动 HTTP 服务go func() {StartHTTPServer()}()// 启动 HTTP api测试服务go func() {StartDebugHTTPServer()}()// ========== 6. 阻塞并等待退出信号 ==========waitForShutdown()
}// waitForShutdown 阻塞主线程,等待退出信号,并执行清理
func waitForShutdown() {sigCh := make(chan os.Signal, 1)signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)receivedSig := <-sigChLogInfo("收到退出信号 %v,开始关闭...", receivedSig)// ======== 触发所有监听器停止 ========if cancel != nil {cancel() // ⭐ 关键:触发 context 取消}// 等待监听器关闭(可加超时)//time.Sleep(1 * time.Second)// ======== 执行清理任务 ========clearAllRedisCacheTables() // 清理缓存(业务逻辑)redis_model.Close() // 关闭 Redis 连接if db != nil {db.Close() // 关闭数据库连接}LogInfo("✅ 所有资源已释放,程序安全退出")
}// OnChangeGeneric 是一个通用的数据变更回调函数
func OnChangeGeneric(tableName string, payload string, timestamp time.Time) {LogInfo("🔔 表 [%s] 发生数据变更 | Payload: %s", tableName, payload)//清理指定表的redis缓存clearRedisCacheTables(tableName)//清理记录查询的缓存数据for k := range cacheFindeData {delete(cacheFindeData, k)}// 在这里你可以:// - 根据 tableName 分发到不同处理函数// - 清除 Redis 缓存: redisClient.Del(ctx, "cache:"+tableName)// - 写入日志审计// - 推送到 WebSocket
}// 测试redis操作代码
func testRedis() {LogInfo("===============测试redis================")// String 操作// 像封装好的字典类,如果key存在,下一个加入的值会直接替换value//设置了数据只存在10sredis_model.StringSet("name", "Alice", 10*time.Second)redis_model.StringSet("name", "S", 10*time.Second)redis_model.StringSet("age", "20", 10*time.Second)name, _ := redis_model.StringGet("name")age, _ := redis_model.StringGet("age")LogInfo("Name:%s, Age:%s", name, age)// Hash 操作//可以当作类或者结构体理解,第一个值=类名,之后的是字段名+值//多次设置同一个字段会覆盖之前的值//设置过期时间 10s 如果不设置就作为持久化数据存储了redis_model.Client.Expire(context.Background(), "user:1001", 10*time.Second)redis_model.HashSet("user:1001", "name", "Bob")redis_model.HashSet("user:1001", "age", "25")redis_model.HashSet("user:1001", "age", "30")user, _ := redis_model.HashGetAll("user:1001")name, _ = redis_model.HashGet("user:1001", "name")age, _ = redis_model.HashGet("user:1001", "age")LogInfo("User:%v name=%v age=%v", user, name, age)// List 操作//clear tasksredis_model.Client.Del(context.Background(), "tasks")//设置过期时间redis_model.Client.Expire(context.Background(), "tasks", 10*time.Second)//添加数据//类似list,如果不设置过期时间或者不执行clear,数据会只增不减redis_model.ListRPush("tasks", "A", "B")redis_model.ListRPush("tasks", "task1", "task2", "A", "B")tasks, _ := redis_model.ListRange("tasks", 0, -1)LogInfo("Tasks:%v", tasks)// Set 操作//无序不重复,类似字典+list//清空数据redis_model.Client.Del(context.Background(), "tags")//设置过期时间redis_model.Client.Expire(context.Background(), "tags", 10*time.Second)redis_model.SetAdd("tags", "go", "redis", "cache")redis_model.SetAdd("tags", "go")redis_model.SetAdd("tags", "test")tags, _ := redis_model.SetMembers("tags")LogInfo("Tags:%v", tags)// ZSet 操作//有序集合,Score=进行排列的数据,Member=数据本身//清空数据redis_model.Client.Del(context.Background(), "leaderboard")//设置过期时间redis_model.Client.Expire(context.Background(), "leaderboard", 10*time.Second)//添加数据redis_model.ZAdd("leaderboard", &redis.Z{Score: 100, Member: "alice"}, &redis.Z{Score: 90, Member: "bob"})redis_model.ZAdd("leaderboard", &redis.Z{Score: 80, Member: 99}, &redis.Z{Score: 70, Member: "charlie"})//降序结果top, _ := redis_model.ZRevRange("leaderboard", 0, -1)//升序结果top2, _ := redis_model.ZRange("leaderboard", 0, -1)LogInfo("Top1:%v \nTop2:%v", top, top2)LogInfo("===============测试redis END================")
}// 测试数据库操作代码
func testDB() {// 初始化数据库连接db = InitDB()defer db.Close()// 查询rows, err := db.Query(context.Background(), "SELECT * FROM table1")if err != nil {fmt.Println("查询失败:", err)return}defer rows.Close()for rows.Next() {var id intvar name stringerr = rows.Scan(&id, &name)if err != nil {}}
}// @title Sample API
// @version 1.0
// @description API测试页面
// @host localhost:8080
func StartDebugHTTPServer() {r := gin.Default()// --- 挂载 Swagger UI ---// 访问 http://localhost:8081/swagger/index.html 查看 UIr.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler))port := ":8081"LogSuccess("启动 HTTP Swagger测试服务启动,监听端口 %s\n", port)// 启动服务器debugApiError := r.Run(port)if debugApiError != nil {LogError("HTTP api测试服务启动失败:%v", debugApiError)} else {LogSuccess("HTTP api测试服务已启动,监听端口 8081")}
}// 启动 HTTP 服务
func StartHTTPServer() {address := "127.0.0.1:8080" //配置连接ip端口//配置跨域,是影响调试页面不能访问8080相关地址的原因handler := corsMiddleware(http.DefaultServeMux)LogSuccess("启动 HTTP 服务,监听端口 %s\n", address)err := http.ListenAndServe(address, handler)if err != nil {log.Fatalf("服务器启动失败:%v", err)}
}// corsMiddleware 是一个中间件,用于添加 CORS 头
func corsMiddleware(next http.Handler) http.Handler {return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {// 设置 CORS 响应头w.Header().Add("Access-Control-Allow-Origin", "http://localhost:8081") // ✅ 修改为你的前端地址w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS, PATCH")w.Header().Set("Access-Control-Allow-Headers","Origin, Content-Type, Accept, Authorization, X-Requested-With")// 如果需要传递 Cookie 或 Authorization Bearer Tokenw.Header().Set("Access-Control-Allow-Credentials", "true")// 处理预检请求 (OPTIONS)if r.Method == "OPTIONS" {w.WriteHeader(http.StatusOK)return}// 调用下一个处理器 (即注册的路由)next.ServeHTTP(w, r)})
}// 注册路由
func RegisterRouter() {http.HandleFunc("/", helloHandler) //http://localhost:8080/http.HandleFunc("/time", timeHandler) //http://localhost:8080/time//查询http.HandleFunc("/findTable", findTableNameCacheRedisHandler) //http://localhost:8080/findTable?tableName=name//查询使用代码内存缓存http.HandleFunc("/findTableMemoryCache", findTableNameCacheHandler) //http://localhost:8080/findTableMemoryCache?tableName=name//分页查询使用代码内存缓存http.HandleFunc("/findTableNamePageCache", findTableNamePageCacheHandler) //http://localhost:8080/findTableNamePageCache?tableName=name&page=1&pageSize=10//分页查询使用redis缓存http.HandleFunc("/findTableNamePageRedisCache", findTableNamePageRedisCacheHandler) //http://localhost:8080/findTableNamePageRedisCache?tableName=name&page=1&pageSize=10//添加 使用token验证http.HandleFunc("/addTable1", addTable1Handler) //http://localhost:8080/addTable1//删除http.HandleFunc("/deleteTableValue", authMiddleware(deleteTableHandler)) //http://localhost:8080/deleteTableValue?tableName=table1&fieldName=test1&fieldValue=123test//修改http.HandleFunc("/updateTableValue", updateTableHandler) //http://localhost:8080/updateTableValue?tableName=table1&findFieldName=test1&findFieldValue=hello&setFieldName=test3&setFieldValue=456//下载文件http.HandleFunc("/downloadFile", downloadHandler) //http://localhost:8080/downloadFile?filePath=D:\GoProject\HTTPServices\README.md//上传文件http.HandleFunc("/uploadFile", uploadHandler) //http://localhost:8080/uploadFile}// 中间件:验证Token
func authMiddleware(next http.HandlerFunc) http.HandlerFunc {return func(w http.ResponseWriter, r *http.Request) {// 从请求头或查询参数获取tokentoken := r.Header.Get("Authorization")if token == "" {token = r.URL.Query().Get("token")}// 清理token格式token = strings.TrimPrefix(token, "Bearer ")if token == "" {http.Error(w, `{"error": "需要访问令牌"}`, http.StatusUnauthorized)return}if !isValidToken(token) {http.Error(w, `{"error": "无效或过期的令牌"}`, http.StatusUnauthorized)return}// Token有效,继续处理请求next.ServeHTTP(w, r)}
}// 生成随机Token
func generateToken() (string, error) {b := make([]byte, tokenLength)_, err := rand.Read(b)if err != nil {return "", err}return base64.URLEncoding.EncodeToString(b), nil
}// 验证Token有效性
func isValidToken(token string) bool {tokenMutex.Lock()defer tokenMutex.Unlock()expiry, exists := validTokens[token]if !exists {return false}if time.Now().After(expiry) {delete(validTokens, token)return false}// 更新token过期时间validTokens[token] = time.Now().Add(tokenExpiry)return true
}// 清理过期Token
func cleanExpiredTokens() {for {time.Sleep(1 * time.Hour)tokenMutex.Lock()for token, expiry := range validTokens {if time.Now().After(expiry) {delete(validTokens, token)}}tokenMutex.Unlock()}
}// 登录处理器
func loginHandler(w http.ResponseWriter, r *http.Request) {if r.Method != "POST" {http.Error(w, "只支持POST方法", http.StatusMethodNotAllowed)return}// 在实际应用中,这里应该验证用户名和密码// 这里简化处理,直接生成tokentoken, err := generateToken()if err != nil {http.Error(w, `{"error": "生成令牌失败"}`, http.StatusInternalServerError)return}expiry := time.Now().Add(tokenExpiry)tokenMutex.Lock()validTokens[token] = expirytokenMutex.Unlock()w.Header().Set("Content-Type", "application/json")json.NewEncoder(w).Encode(TokenResponse{Token: token,Expires: expiry,})
}// Token验证处理器
func validateHandler(w http.ResponseWriter, r *http.Request) {w.Header().Set("Content-Type", "application/json")json.NewEncoder(w).Encode(map[string]interface{}{"valid": true,"message": "Token有效",})
}// APIResponse 定义了统一的 API 响应格式
type APIResponse struct {Success bool `json:"success"` // 是否成功Status int `json:"status"` // HTTP 状态码Message string `json:"message,omitempty"` // 简短消息 报错时的提示信息Data interface{} `json:"data,omitempty"` // 主要数据内容Timestamp string `json:"timestamp"` // 时间戳 (秒)
}// SendJSONResponse 封装了 JSON 响应的发送逻辑
func SendJSONResponse(w http.ResponseWriter, success bool, status int, message string, data interface{}) {// 设置 Content-Typew.Header().Set("Content-Type", "application/json")// 设置 HTTP 状态码w.WriteHeader(status)// 构造响应体response := APIResponse{Success: success,Status: status,Message: message,Data: data,Timestamp: time.Now().Format("2006-01-02 15:04:05"), // 当前时间戳格式化}// 编码并发送 JSONif err := json.NewEncoder(w).Encode(response); err != nil {// 如果编码失败,记录错误(但不能再次写入 w,因为 Header 已经发送)http.Error(w, "Internal Server Error", http.StatusInternalServerError)// log.Printf("JSON encode error: %v", err) // 取消注释以记录日志}
}// @Summary 根目录测试连接
// @Description
// @Tags tags1
// @Accept json
// @Produce json
// @Router / [get]
func helloHandler(w http.ResponseWriter, r *http.Request) {LogInfo("访问路径:%s,来源:%s\n", r.URL.Path, r.RemoteAddr)// 编码 JSON 响应SendJSONResponse(w, true, http.StatusOK, "成功", "访问了根目录 Hello, World! 👋")
}// @Summary 查询服务器时间
// @Description
// @Tags tags1
// @Accept json
// @Produce json
// @Router /time [get]
func timeHandler(w http.ResponseWriter, r *http.Request) {LogInfo("访问路径:%s,来源:%s\n", r.URL.Path, r.RemoteAddr)currentTime := time.Now().Format("2006-01-02 15:04:05")// ✅ 设置响应头SendJSONResponse(w, true, http.StatusOK, "成功", currentTime)
}// @Summary 修改指定表名中,find字段名等于指定值的set字段名的数据
// @Description 根据提供的表名、find字段名、find字段值、set字段名、set字段值,修改数据库中的数据。
// @Tags tags1
// @Produce json
// @Param tableName query string true "要查询的数据库表名" default(table1)
// @Param fieldName query string true "要查询的字段名"
// @Param fieldValue query string true "要查询的字段值"
// @Param setFieldName query string true "要更新的字段名"
// @Param setFieldValue query string true "要更新的字段值"
// @Router /updateTableValue [get]
func updateTableHandler(w http.ResponseWriter, r *http.Request) {// 解析请求参数tableName := r.URL.Query().Get("tableName")findFieldName := r.URL.Query().Get("findFieldName")findFieldValue := r.URL.Query().Get("findFieldValue")setFieldName := r.URL.Query().Get("setFieldName")setFieldValue := r.URL.Query().Get("setFieldValue")// 完整的参数验证if tableName == "" || findFieldName == "" || setFieldName == "" {http.Error(w, "缺少必要参数", http.StatusBadRequest)return}// 🔐 白名单验证 - 只允许预定义的表和字段allowedTables := map[string]bool{"table1": true, "table2": true}allowedFields := map[string]bool{"test1": true, "test2": true, "test3": true,"test4": true, "test5": true, "test6": true, "test7": true,}if !allowedTables[tableName] {http.Error(w, "不允许的表名", http.StatusBadRequest)return}if !allowedFields[findFieldName] || !allowedFields[setFieldName] {http.Error(w, "不允许的字段名", http.StatusBadRequest)return}// ✅ 使用参数化查询,表名和字段名通过白名单验证后拼接query := fmt.Sprintf("UPDATE %s SET %s = $1 WHERE %s = $2",tableName, setFieldName, findFieldName,)result, err := db.Exec(context.Background(), query, setFieldValue, findFieldValue)if err != nil {http.Error(w, "更新数据失败: "+err.Error(), http.StatusInternalServerError)return}// 检查是否实际更新了数据rowsAffected := result.RowsAffected()if rowsAffected == 0 {http.Error(w, "未找到匹配的数据进行更新", http.StatusNotFound)return}SendJSONResponse(w, true, http.StatusOK, "成功", fmt.Sprintf("%d 行已更新", rowsAffected))}// @Summary 删除指定表名中,指定字段名等于指定值的数据
// @Description 根据提供的表名和字段名和值,删除数据库中的数据。
// @Tags tags1
// @Produce json
// @Param tableName query string true "要删除的数据库表名"
// @Param fieldName query string true "要删除的字段名"
// @Param fieldValue query string true "要删除的字段值"
// @Router /deleteTableValue [get]
func deleteTableHandler(w http.ResponseWriter, r *http.Request) {// 解析请求参数tableName := r.URL.Query().Get("tableName")fieldName := r.URL.Query().Get("fieldName")fieldValue := r.URL.Query().Get("fieldValue")if tableName == "" || fieldName == "" || fieldValue == "" {http.Error(w, "参数错误", http.StatusBadRequest)return}// 执行 SQL 语句,使用参数化查询query := fmt.Sprintf("DELETE FROM %s WHERE %s = $1", tableName, fieldName)_, err := db.Exec(context.Background(), query, fieldValue)if err != nil {http.Error(w, "删除数据失败: "+err.Error(), http.StatusInternalServerError)return}SendJSONResponse(w, true, http.StatusOK, "成功", "数据已删除")
}// @Summary 向table1表中添加数据,字段名=test1,test2,test3,test4,test5,test6,test7
// @Description 根据提供的json数据,向数据库table1中添加数据。
// @Tags tags1
// @Produce json
// @Param data body string true "要插入的数据对象"
// @Router /addTable1 [post]
func addTable1Handler(w http.ResponseWriter, r *http.Request) {// 定义需要插入的数据结构type requestData struct {Test1 string `json:"test1"`Test2 CustomTime `json:"test2"`Test3 uint32 `json:"test3"`Test4 string `json:"test4"`Test5 float64 `json:"test5"`Test6 int32 `json:"test6"`Test7 float64 `json:"test7"`}// 解析请求参数var data requestDataerr := json.NewDecoder(r.Body).Decode(&data)if err != nil {http.Error(w, "解析请求参数失败: "+err.Error(), http.StatusBadRequest)return}// 执行 SQL 语句,使用参数化查询query := "INSERT INTO table1 (test1, test2, test3, test4, test5, test6, test7) VALUES ($1, $2, $3, $4, $5, $6, $7)"_, err = db.Exec(context.Background(), query, data.Test1, data.Test2.Time, data.Test3, data.Test4, data.Test5, data.Test6, data.Test7)if err != nil {http.Error(w, "插入数据失败: "+err.Error(), http.StatusInternalServerError)return}//清理指定表的redis缓存clearRedisCacheTables("table1")SendJSONResponse(w, true, http.StatusOK, "成功", "数据已插入")
}// @Summary 查询指定表名的全部数据,使用redis缓存数据
// @Description 根据提供的表名查询数据库中的所有数据。
// @Tags tags1
// @Produce json
// @Param tableName query string true "要查询的数据库表名" default(table1)
// @Router /findTable [get]
func findTableNameCacheRedisHandler(w http.ResponseWriter, r *http.Request) {defer TimeTrack(time.Now(), "findTableNameCacheRedisHandler")tableName := r.URL.Query().Get("tableName")if tableName == "" {http.Error(w, "tableName is empty", http.StatusBadRequest)return}// ✅ 安全校验表名(防止 SQL 注入)if !isValidTableName(tableName) {http.Error(w, "invalid table name", http.StatusBadRequest)return}//使用redis缓存数据cacheKey := "findTableNameHandler_" + tableNamecount, err := redis_model.StringExists(cacheKey)if count {jsonData, _ := redis_model.StringGet(cacheKey)SendJSONResponse(w, true, http.StatusOK, "成功", json.RawMessage(jsonData))return}// ✅ 使用参数化方式拼接表名(仅限对象名,如表、字段)query := fmt.Sprintf("SELECT * FROM %s", tableName)rows, err := db.Query(context.Background(), query)if err != nil {http.Error(w, "查询失败: "+err.Error(), http.StatusInternalServerError)return}defer rows.Close()// ✅ 使用 pgx 内置工具自动转为 []map[string]interface{}data, err := pgx.CollectRows(rows, pgx.RowToMap)if err != nil {http.Error(w, "解析数据失败: "+err.Error(), http.StatusInternalServerError)return}//写入redis缓存数据if jsonData, err := json.Marshal(data); err == nil {redis_model.StringSet(cacheKey, string(jsonData), 0)//记录已缓存的key,在数据变化时,删除缓存redisCacheTables[tableName] = append(redisCacheTables[tableName], cacheKey)}SendJSONResponse(w, true, http.StatusOK, "成功", data)
}// 记录查询的缓存数据
var cacheFindeData = make(map[string]string)// @Summary 查询指定表名的全部数据,使用代码内存缓存数据
// @Description 根据提供的表名查询数据库中的所有数据。
// @Tags tags1
// @Produce json
// @Param tableName query string true "要查询的数据库表名" default(table1)
// @Router /findTableMemoryCache [get]
func findTableNameCacheHandler(w http.ResponseWriter, r *http.Request) {defer TimeTrack(time.Now(), "findTableNameCacheHandler")tableName := r.URL.Query().Get("tableName")if tableName == "" {http.Error(w, "tableName is empty", http.StatusBadRequest)return}// ✅ 安全校验表名(防止 SQL 注入)if !isValidTableName(tableName) {http.Error(w, "invalid table name", http.StatusBadRequest)return}//使用代码内存缓存数据cacheKey := "findTableNameHandler_" + tableNameif cacheFindeData[cacheKey] != "" {jsonData, _ := json.Marshal(cacheFindeData[cacheKey])SendJSONResponse(w, true, http.StatusOK, "成功", json.RawMessage(jsonData))return}// ✅ 使用参数化方式拼接表名(仅限对象名,如表、字段)query := fmt.Sprintf("SELECT * FROM %s", tableName)rows, err := db.Query(context.Background(), query)if err != nil {http.Error(w, "查询失败: "+err.Error(), http.StatusInternalServerError)return}defer rows.Close()// ✅ 使用 pgx 内置工具自动转为 []map[string]interface{}data, err := pgx.CollectRows(rows, pgx.RowToMap)if err != nil {http.Error(w, "解析数据失败: "+err.Error(), http.StatusInternalServerError)return}//写入缓存jsonData, err := json.Marshal(data)if err == nil {cacheFindeData[cacheKey] = string(jsonData)}SendJSONResponse(w, true, http.StatusOK, "成功", data)
}// @Summary 分页查询指定表名的全部数据,使用redis存缓存数据
// @Description 根据提供的表名查询数据库中的所有数据。
// @Tags tags1
// @Produce json
// @Param tableName query string true "要查询的数据库表名" default(table1)
// @Param page query int true "当前页码" default(1)
// @Param pageSize query int true "每页大小" default(20)
// @Router /findTableNamePageRedisCache [get]
func findTableNamePageRedisCacheHandler(w http.ResponseWriter, r *http.Request) {defer TimeTrack(time.Now(), "findTableNameCacheHandler")// 1. 获取查询参数tableName := r.URL.Query().Get("tableName")pageStr := r.URL.Query().Get("page")pageSizeStr := r.URL.Query().Get("pageSize")if tableName == "" {http.Error(w, "tableName is empty", http.StatusBadRequest)return}// ✅ 安全校验表名(防止 SQL 注入)if !isValidTableName(tableName) {http.Error(w, "invalid table name", http.StatusBadRequest)return}// 2. 解析分页参数(默认第1页,每页20条)page := 1pageSize := 20if pageStr != "" {if p, err := strconv.Atoi(pageStr); err == nil && p >= 1 {page = p}}if pageSizeStr != "" {if ps, err := strconv.Atoi(pageSizeStr); err == nil && ps >= 1 && ps <= 1000 {pageSize = ps // 限制最大页大小,防止内存溢出}}offset := (page - 1) * pageSize// 3. 缓存键包含分页信息cacheKey := fmt.Sprintf("findTableNameHandler_%s_page%d_size%d", tableName, page, pageSize)count, _ := redis_model.StringExists(cacheKey)if count {jsonData, _ := redis_model.StringGet(cacheKey)SendJSONResponse(w, true, http.StatusOK, "成功", json.RawMessage(jsonData))return}// 4. 查询总记录数(用于分页显示)var totalCount int64countQuery := fmt.Sprintf("SELECT COUNT(*) FROM %s", tableName)err := db.QueryRow(context.Background(), countQuery).Scan(&totalCount)if err != nil {http.Error(w, "查询总数失败: "+err.Error(), http.StatusInternalServerError)return}// 5. 分页查询数据query := fmt.Sprintf("SELECT * FROM %s LIMIT $1 OFFSET $2", tableName)rows, err := db.Query(context.Background(), query, pageSize, offset)if err != nil {http.Error(w, "查询数据失败: "+err.Error(), http.StatusInternalServerError)return}defer rows.Close()// 转为 []map[string]interface{}data, err := pgx.CollectRows(rows, pgx.RowToMap)if err != nil {http.Error(w, "解析数据失败: "+err.Error(), http.StatusInternalServerError)return}// 6. 构造分页响应(✅ 已完整补全)response := map[string]interface{}{"total": totalCount, // 总记录数"page": page, // 当前页码"pageSize": pageSize, // 每页大小"data": data, // 当前页数据}// 7. 序列化响应并缓存jsonData, err := json.Marshal(response)if err == nil {redis_model.StringSet(cacheKey, string(jsonData), 0)//记录已缓存的key,在数据变化时,删除缓存redisCacheTables[tableName] = append(redisCacheTables[tableName], cacheKey)}// 8. 返回 JSON 响应SendJSONResponse(w, true, http.StatusOK, "成功", response)
}// @Summary 分页查询指定表名的全部数据,使用代码内存缓存数据
// @Description 根据提供的表名查询数据库中的所有数据。
// @Tags tags1
// @Produce json
// @Param tableName query string true "要查询的数据库表名" default(table1)
// @Param page query int true "当前页码" default(1)
// @Param pageSize query int true "每页大小" default(20)
// @Router /findTableNamePageCache [get]
func findTableNamePageCacheHandler(w http.ResponseWriter, r *http.Request) {defer TimeTrack(time.Now(), "findTableNameCacheHandler")// 1. 获取查询参数tableName := r.URL.Query().Get("tableName")pageStr := r.URL.Query().Get("page")pageSizeStr := r.URL.Query().Get("pageSize")if tableName == "" {http.Error(w, "tableName is empty", http.StatusBadRequest)return}// ✅ 安全校验表名(防止 SQL 注入)if !isValidTableName(tableName) {http.Error(w, "invalid table name", http.StatusBadRequest)return}// 2. 解析分页参数(默认第1页,每页20条)page := 1pageSize := 20if pageStr != "" {if p, err := strconv.Atoi(pageStr); err == nil && p >= 1 {page = p}}if pageSizeStr != "" {if ps, err := strconv.Atoi(pageSizeStr); err == nil && ps >= 1 && ps <= 1000 {pageSize = ps // 限制最大页大小,防止内存溢出}}offset := (page - 1) * pageSize// 3. 缓存键包含分页信息cacheKey := fmt.Sprintf("findTableNameHandler_%s_page%d_size%d", tableName, page, pageSize)if jsonData, found := cacheFindeData[cacheKey]; found {SendJSONResponse(w, true, http.StatusOK, "成功", json.RawMessage(jsonData))return}// 4. 查询总记录数(用于分页显示)var totalCount int64countQuery := fmt.Sprintf("SELECT COUNT(*) FROM %s", tableName)err := db.QueryRow(context.Background(), countQuery).Scan(&totalCount)if err != nil {http.Error(w, "查询总数失败: "+err.Error(), http.StatusInternalServerError)return}// 5. 分页查询数据query := fmt.Sprintf("SELECT * FROM %s LIMIT $1 OFFSET $2", tableName)rows, err := db.Query(context.Background(), query, pageSize, offset)if err != nil {http.Error(w, "查询数据失败: "+err.Error(), http.StatusInternalServerError)return}defer rows.Close()// 转为 []map[string]interface{}data, err := pgx.CollectRows(rows, pgx.RowToMap)if err != nil {http.Error(w, "解析数据失败: "+err.Error(), http.StatusInternalServerError)return}// 6. 构造分页响应(✅ 已完整补全)response := map[string]interface{}{"total": totalCount, // 总记录数"page": page, // 当前页码"pageSize": pageSize, // 每页大小"data": data, // 当前页数据}// 7. 序列化响应并缓存jsonData, err := json.Marshal(response)if err == nil {cacheFindeData[cacheKey] = string(jsonData)}// 8. 返回 JSON 响应SendJSONResponse(w, true, http.StatusOK, "成功", response)
}// 安全校验表名(防止 SQL 注入)
func isValidTableName(name string) bool {// 只允许字母、数字、下划线,且不能以数字开头matched, _ := regexp.MatchString(`^[a-zA-Z_][a-zA-Z0-9_]*$`, name)return matched
}// @Summary 下载文件
// @Description
// @Tags tags1
// @Produce json
// @Router /downloadFile [get]
func downloadHandler(w http.ResponseWriter, r *http.Request) {fileName := "testdownloadFIle.txt"// 要下载的文件路径filePath := "./files/" + fileName // 假设文件在项目根目录下的 files 文件夹中// 设置响应头,提示浏览器下载(可选)w.Header().Set("Content-Disposition", "attachment; filename="+fileName)// 如果不设置,浏览器可能会尝试直接打开文件(如PDF、图片)// 使用 http.ServeFile 提供文件http.ServeFile(w, r, filePath)
}// uploadHandler 处理文件上传
// @Summary 上传文件
// @Description 支持上传任意文件,最大 50MB
// @Tags tags1
// @Accept multipart/form-data
// @Produce json
// @Param file formData file true "要上传的文件"
// @Success 200 {string} string "文件上传成功"
// @Failure 400 {object} map[string]string "请求错误,如文件太大或格式错误"
// @Failure 500 {object} map[string]string "服务器内部错误"
// @Router /uploadFile [post]
func uploadHandler(w http.ResponseWriter, r *http.Request) {if r.Method != "POST" {http.Error(w, "只支持POST方法", http.StatusMethodNotAllowed)return}// 解析 multipart form 文件大小限制if err := r.ParseMultipartForm(50 << 20); err != nil { // 50MB限制http.Error(w, "文件太大", http.StatusBadRequest)return}// 获取文件file, handler, err := r.FormFile("file")if err != nil {http.Error(w, "获取文件失败: "+err.Error(), http.StatusBadRequest)return}defer file.Close()// 创建目标文件filename := fmt.Sprintf("%d_%s", time.Now().UnixNano(), handler.Filename)dstPath := filepath.Join("./files/", filename)dst, err := os.Create(dstPath)if err != nil {http.Error(w, "创建文件失败: "+err.Error(), http.StatusInternalServerError)return}defer dst.Close()// 复制文件内容if _, err := io.Copy(dst, file); err != nil {http.Error(w, "保存文件失败: "+err.Error(), http.StatusInternalServerError)return}// 返回响应SendJSONResponse(w, true, http.StatusOK, "成功", "文件上传成功")
}// 清空redisCacheTables 使用的数据库表的缓存
func clearRedisCacheTables(tableName string) {for i := 0; i < len(redisCacheTables[tableName]); i++ {redis_model.StringDel(redisCacheTables[tableName][i])}
}// 清空redisCacheTables所有的缓存数据
func clearAllRedisCacheTables() {//循环redisCacheTables所有内容for _, cacheKeys := range redisCacheTables {//循环缓存keyfor i := 0; i < len(cacheKeys); i++ {//删除缓存redis_model.StringDel(cacheKeys[i])}}
}
tools.go
package main//引用的包
import ("context""database/sql""fmt""log""strconv""strings""sync""time""github.com/jackc/pgx/v5/pgxpool" //pgsql数据库组件"github.com/xuri/excelize/v2" //解析excel文件包
)// 定义数据库相关配置
const (host = "localhost" //数据库ipport = 5432 //数据库端口user = "postgres" //数据库用户名password = "postgres" //数据库密码dbname = "postgresLearning" //数据库名
)// 初始化数据库连接
func InitDB() *pgxpool.Pool {// 构建连接字符串psqlInfo := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable",host, port, user, password, dbname)// 连接数据库pool, err := pgxpool.New(context.Background(), psqlInfo)//db, err := sql.Open("postgres", psqlInfo)if err != nil {log.Fatal(err)}//defer pool.Close() 这里有一个注意点,这块代码回直接关闭数据库连接// 检查连接err = pool.Ping(context.Background())if err != nil {log.Fatal(err)}LogSuccess("Successfully connected to PostgreSQL database!")return pool
}// OnChangeCallback 定义回调函数类型
// payload: NOTIFY 发送的消息内容(如 "INSERT: id=1")
// timestamp: 收到通知的时间
type OnChangeCallback func(tableName, payload string, timestamp time.Time)// listenForNotifications 使用连接池监听通知
func ListenForNotifications(ctx context.Context, pool *pgxpool.Pool, wg *sync.WaitGroup, channel string, onChange OnChangeCallback) {defer wg.Done()// 从连接池获取一个连接用于监听conn, err := pool.Acquire(ctx)if err != nil {LogError("获取监听连接失败: %v", err)return}defer conn.Release()// 监听频道_, err = conn.Exec(ctx, "LISTEN "+channel)if err != nil {LogError("监听频道失败: %v,%v", channel, err)return}LogInfo("开始监听 %v 频道...", channel)for {select {case <-ctx.Done():LogInfo("停止监听通知")returndefault:// 等待通知,设置超时时间ctxWithTimeout, cancel := context.WithTimeout(ctx, 30*time.Second)notification, err := conn.Conn().WaitForNotification(ctxWithTimeout)cancel()if err != nil {if err == context.DeadlineExceeded {// 超时是正常的,继续等待continue}// 如果返回的错误信息包含 "timeout",则继续if strings.Contains(err.Error(), "timeout") {continue}if ctx.Err() != nil {// 上下文被取消,正常退出return}LogError("等待通知失败: %v", err)time.Sleep(1 * time.Second)continue}if notification != nil {if onChange != nil {onChange(channel, notification.Payload, time.Now())}}}}
}// GetTableNames 查询当前数据库中所有的普通表(不包括系统表、视图等)
func GetTableNames(pool *pgxpool.Pool) ([]string, error) {const query = `SELECT table_nameFROM information_schema.tablesWHERE table_schema = 'public' -- 只查 public 模式AND table_type = 'BASE TABLE' -- 只查基本表(排除视图)ORDER BY table_name;`rows, err := pool.Query(context.Background(), query)if err != nil {return nil, fmt.Errorf("执行查询失败: %w", err)}defer rows.Close()var tableNames []stringfor rows.Next() {var tableName stringif err := rows.Scan(&tableName); err != nil {return nil, fmt.Errorf("读取表名失败: %w", err)}tableNames = append(tableNames, tableName)}if err := rows.Err(); err != nil {return nil, fmt.Errorf("遍历结果出错: %w", err)}return tableNames, nil
}// 读取Excel文件
func ReadExcel(path string, showLog bool) []Table {createTable := []Table{}// 打开Excel文件f, err := excelize.OpenFile(path)if err != nil {fmt.Println(err)return createTable}// 获取工作表名称列表sheetNames := f.GetSheetList()//遍历sheet列表for _, sheetName := range sheetNames {if showLog {LogInfo("开始处理%s工作表", sheetName)}itemTable := Table{Name: sheetName,}// 读取指定工作表的所有行rows, err := f.GetRows(sheetName)if err != nil {LogError("读取%s工作表失败,原因: %v", sheetName, err)continue}for _, row := range rows[1:] {itemColumns := Column{Name: row[0],Type: parseColumnType(row[1]),Length: row[2],NotNull: parseBool(row[3], showLog),Unique: parseBool(row[4], showLog),Primary: parseBool(row[5], showLog),}if len(row) > 6 {itemColumns.Default = row[6]}itemTable.Columns = append(itemTable.Columns, itemColumns)if showLog {// 遍历行中的单元格for _, colCell := range row {LogInfo("%s", colCell)}}}createTable = append(createTable, itemTable)}return createTable
}// 创建数据库表
func createDBTable(db *sql.DB, table Table) {success, createTableSQL := CreateTable(table)if success {LogInfo("sql=%s", createTableSQL)_, err := db.Exec(createTableSQL)if err != nil {LogError("创建%s数据表失败,原因: %v", table.Name, err)} else {LogSuccess("创建%s数据表成功", table.Name)}} else {LogError("创建%s数据表失败,原因: %s", table.Name, createTableSQL)}
}// ColumnItemType 定义列类型的自定义类型
type ColumnItemType string// 支持的列类型常量
const (VARCHAR ColumnItemType = "VARCHAR"TIMESTAMP ColumnItemType = "TIMESTAMP"SERIAL ColumnItemType = "SERIAL"TEXT ColumnItemType = "TEXT"DECIMAL ColumnItemType = "DECIMAL"INT ColumnItemType = "INT"
)func parseColumnType(typeStr string) ColumnItemType {switch strings.ToUpper(typeStr) {case "VARCHAR":return VARCHARcase "TIMESTAMP":return TIMESTAMPcase "SERIAL":return SERIALcase "TEXT":return TEXTcase "DECIMAL":return DECIMALcase "INT":return INT// 处理其他可能的类型default:return ColumnItemType(typeStr) // 如果类型不在预定义的范围内,可以返回原字符串或默认值}
}// 辅助函数,将字符串转换为整数,如果转换失败则打印错误信息并返回0
func mustAtoi(s string, showLog bool) int {i, err := strconv.Atoi(s)if err != nil {if showLog {LogWarning("无法解析int值,原因: %v", err)}return 0 // 或者你可以选择返回一个默认值,或者根据错误处理逻辑来决定}return i
}// 添加一个函数来将字符串转换为布尔值
func parseBool(str string, showLog bool) bool {switch str {case "true", "1", "TRUE", "T", "Y", "YES":return truecase "false", "0", "FALSE", "F", "N", "NO":return falsedefault:// 你可以根据需要处理默认情况,比如记录日志或者返回一个默认值if showLog {LogWarning("无法解析布尔值,设置默认值=false, 原始值=%s", str)}return false}
}// Column 定义字段结构
type Column struct {Name stringType ColumnItemTypeLength string // 长度,仅对 VARCHAR、DECIMAL 等有效NotNull boolUnique boolPrimary boolDefault string
}// Table 定义表结构
type Table struct {Name stringColumns []Column
}// CreateTable 生成 CREATE TABLE SQL 语句
func CreateTable(table Table) (bool, string) {if table.Name == "" {LogError("表名不能为空")return false, ""}if len(table.Columns) == 0 {LogError("字段列表不能为空")return false, ""}var fieldDefs []stringfor _, col := range table.Columns {if col.Name == "" {LogError("字段名不能为空")return false, ""}def := col.Name + " " + string(col.Type) // 注意:col.Type 是 ColumnItemType,需转为 string// 处理长度(仅对支持长度的类型)if len(col.Length) > 0 && (col.Type == VARCHAR || col.Type == DECIMAL) {// 可以根据 Type 判断是否支持 Length,例如只对 VARCHAR 和 DECIMAL 生效def += "(" + col.Length + ")"}// 添加约束if col.NotNull {def += " NOT NULL"}if col.Unique {def += " UNIQUE"}if col.Primary {def += " PRIMARY KEY"}if col.Default != "" {// 判断是否需要为 DEFAULT 值加引号if col.Type == TEXT || col.Type == VARCHAR {def += fmt.Sprintf(" DEFAULT '%s'", EscapeString(col.Default))} else {def += " DEFAULT " + col.Default}}fieldDefs = append(fieldDefs, def)}// 拼接完整 SQLsql := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (%s);",table.Name,strings.Join(fieldDefs, ", "),)return true, sql
}// EscapeString 是一个假设的函数,用于转义SQL字符串中的特殊字符
func EscapeString(s string) string {// 实现对字符串s中单引号等特殊字符的转义return strings.ReplaceAll(s, "'", "''") // 示例:转义单引号
}// 辅助函数:判断 DEFAULT 是否需要加引号
func isStringDefault(defaultValue string) bool {// 尝试将 defaultValue 转换为数字或 NULL_, err1 := strconv.ParseFloat(defaultValue, 64)_, err2 := strconv.ParseBool(defaultValue)// 如果 defaultValue 可以转换为数字或布尔值,或者它是 "NULL",则不需要加引号return !(err1 == nil || err2 == nil || strings.ToUpper(defaultValue) == "NULL")
}// 辅助函数:打印耗时
func TimeTrack(start time.Time, name string) {duration := time.Since(start)ms := float64(duration) / float64(time.Millisecond)LogInfo("📊 %s 执行耗时: %.2f ms\n", name, ms)
}// 定义支持 "MM/DD/YYYY HH:MM:SS" 格式的时间类型
type CustomTime struct {time.Time
}// 实现 UnmarshalJSON 方法,解析 JSON 字符串为 CustomTime 类型
func (ct *CustomTime) UnmarshalJSON(data []byte) error {s := string(data)if len(s) >= 2 && s[0] == '"' && s[len(s)-1] == '"' {s = s[1 : len(s)-1]}// 支持的格式列表formats := []string{"01/02/2006 15:04:05","2006-01-02T15:04:05Z07:00","2006-01-02T15:04:05","2006-01-02 15:04:05",time.RFC3339,time.DateTime,}for _, layout := range formats {t, err := time.Parse(layout, s)if err == nil {ct.Time = treturn nil}}return fmt.Errorf("无法解析时间 '%s',支持的格式: MM/DD/YYYY HH:MM:SS 或 ISO8601", s)
}// ==================================封装打印log========================================
const (Red = "31"Green = "32"Yellow = "33"Blue = "34"Purple = "35"Cyan = "36"White = "37"
)// PrintColor 打印指定颜色的文本
// colorCode: ANSI 颜色码
// format: 格式化字符串,如 "创建%s表成功"
// args: 格式化参数
func LogColor(colorCode string, format string, args ...interface{}) {// \033[颜色码m + 文本 + \033[0m(重置)colored := fmt.Sprintf("\033[%sm%s\033[0m", colorCode, fmt.Sprintf(format, args...))fmt.Println(colored)
}func LogError(format string, args ...interface{}) {LogColor(Red, format, args...)
}
func LogInfo(format string, args ...interface{}) {LogColor(White, format, args...)
}
func LogWarning(format string, args ...interface{}) {LogColor(Yellow, format, args...)
}
func LogSuccess(format string, args ...interface{}) {LogColor(Green, format, args...)
}//==================================封装打印log END========================================
redis_model相关代码在之前的内容里。
四、redis和代码内存对比
1.查询接口添加了分页查询功能,同时对应使用redis内存和代码内存分别创建了不同接口。
2.测试redis和代码内存分别作为缓存的函数调用时间,发现一个有趣的现象。
每次测试都重启服务,且在启动时清除了redis所有的数据,具体结果如下图,不论是redis还是代码内存,都是第一次调用的接口最耗时,之后再次调用另一个接口时间就会大大缩短。我的测试数据库表里存了5w多条数据。
3.缓存使用,就是最简单的方式。查询接口返回数据前->将数据缓存下来->下次再次触发接口->如果参数一致->则直接从缓存中匹配->如果有匹配项->直接返回缓存数据。
使用触发器,实现当数据库表变化时,清空对应表的缓存数据。
五、碎碎念
1.学数据库的触发器搞的身心疲惫,没有人带,纯问AI、查资料,搞了整整2天才跑通。
2.下一篇应该会整理代码,把现在一坨一坨的代码,封装整理下。