go多线程压测监控
实现了
- go多协程压力测试
- 实现了Monitor,异步统计qps、时延、cpu(client端)等指标,周期printStat。只需要把单条执行func传给Monitor即可
- 命令行传参
- ctrl+c之后正常退出
- (mock cpu 占用)
代码见 https://gitee.com/bbjg001/golearning/tree/master/others/stress_monitor
执行
go run *.go -action w -routine 5 -duration 30s
效果
更多的
可以把收集到的指标转给/metrics
接口,可以实现外部手机指标,提供给prometheus收集,接口grafana实现可视化监控展示
附
util.go
package mainimport ("context""fmt""math/rand""os""sync/atomic""time""github.com/rcrowley/go-metrics""github.com/shirou/gopsutil/v3/process"
)type SCMonitor struct {registry metrics.Registryinterval time.DurationstopChan chan struct{}qpsCounter metrics.CounterlatencyTimer metrics.TimertotalRequests uint64failedRequests uint64lastCompute time.Time
}func NewSCMonitor(interval time.Duration) *SCMonitor {registry := metrics.NewRegistry()return &SCMonitor{registry: registry,interval: interval,stopChan: make(chan struct{}),qpsCounter: metrics.NewCounter(),latencyTimer: metrics.NewTimer(),}
}func (m *SCMonitor) Start() {// 注册指标m.registry.Register("requests.qps", m.qpsCounter)m.registry.Register("requests.latency", m.latencyTimer)// CPU监控cpuGauge := metrics.NewGauge()m.registry.Register("system.cpu", cpuGauge)m.lastCompute = time.Now()go func() {p, err := process.NewProcess(int32(os.Getpid()))if err != nil {fmt.Printf("Failed to init process monitor: %v\n", err)return}ticker := time.NewTicker(m.interval)defer ticker.Stop()// fmt.Printf("time\tqps\tcpu\tlatency999\tlatency99\tlatency9\tlatencyMean\n")fmt.Printf("%-12s%-30s%-10s%-16s%-16s%-16s%-16s\n","time", "qps", "cpu", "latency999", "latency99", "latency9", "latencyMean")for {select {case <-m.stopChan:returncase <-ticker.C:// 更新CPU指标if percent, err := p.Percent(0); err == nil {cpuGauge.Update(int64(percent * 100))}// 打印周期报告m.PrintReport()}}}()
}func (m *SCMonitor) Monitor(ctx context.Context, work func() (duration time.Duration, succeed bool)) {for {select {case <-ctx.Done():returndefault:duration, succeed := work()m.RecordRequest(duration, succeed)}}
}func (m *SCMonitor) RecordRequest(latency time.Duration, succeed bool) {atomic.AddUint64(&m.totalRequests, 1)if !succeed {atomic.AddUint64(&m.failedRequests, 1)}m.qpsCounter.Inc(1)m.latencyTimer.Update(latency)
}func (m *SCMonitor) PrintReport() {now := time.Now().Format("15:04:05")// fmt.Printf("\n=== Metrics Report @ %s ===\n", now)fmt.Printf("%-12s", now)// QPS计算if qps := m.registry.Get("requests.qps"); qps != nil {counter := qps.(metrics.Counter)interval := time.Since(m.lastCompute).Milliseconds()rate := float64(counter.Count()*1000) / float64(interval)qpsStr := fmt.Sprintf("%.1f (Total: %d)", rate, atomic.LoadUint64(&m.totalRequests))fmt.Printf("%-30s", qpsStr)counter.Clear()}m.lastCompute = time.Now()// CPU使用率if cpu := m.registry.Get("system.cpu"); cpu != nil {cpuStr := fmt.Sprintf("%.1f%%\t", float64(cpu.(metrics.Gauge).Value())/100)fmt.Printf("%-10s", cpuStr)}// 时延统计if timer := m.registry.Get("requests.latency"); timer != nil {t := timer.(metrics.Timer)fmt.Printf("%-16.2f%-16.2f%-16.2f%-16.2f\n", t.Percentile(0.999)/1000000, t.Percentile(0.99)/1000000,t.Percentile(0.9)/1000000, t.Mean()/1000000)}
}func (m *SCMonitor) Stop() {close(m.stopChan)m.PrintReport() // 最终报告
}// 模拟cpu负载
func mockCpuLoad(loadPercentage int, durationSecond int) {workTime := time.Duration(loadPercentage) * time.MillisecondsleepTime := time.Duration(100-loadPercentage) * time.Millisecondctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(durationSecond))defer cancel()for {select {case <-ctx.Done():returndefault:start := time.Now()// 在工作时间内进行计算for time.Since(start) < workTime {// 执行一些计算密集型操作_ = rand.Intn(1000) * rand.Intn(1000)}time.Sleep(sleepTime)}}
}
main.go
package mainimport ("context""flag""fmt""math/rand""os""os/signal""syscall""time"
)var (testCfg = TConfig{}values = make([]string, 1000)baseVal string
)type TConfig struct {routine intaction stringduration time.Duration
}func (c TConfig) String() string {return "=============测试参数\n" +fmt.Sprintf("routine: %d\n", c.routine) +fmt.Sprintf("action: %s\n", c.action) +fmt.Sprintf("duration: %v\n", c.duration)
}func init() {parseFlags()
}func parseFlags() {flag.IntVar(&testCfg.routine, "routine", 1, "线程数")flag.StringVar(&testCfg.action, "action", "r", "action, w|r")flag.DurationVar(&testCfg.duration, "duration", time.Minute, "测试时长")flag.Parse()
}func doWrite(uid int64) (time.Duration, bool) {// 此处uit只做传参示例d := int(rand.Float64() * 10000)startTime := time.Now()time.Sleep(time.Microsecond * time.Duration(d))succeed := rand.Float64() < 0.002return time.Since(startTime), succeed
}func doRead(uid int64) (time.Duration, bool) {d := int(rand.Float64() * 5000)startTime := time.Now()time.Sleep(time.Microsecond * time.Duration(d))succeed := rand.Float64() < 0.0001return time.Since(startTime), succeed
}func main() {fmt.Println(testCfg.String())fmt.Println("可Ctrl+C退出")go mockCpuLoad(15, 12) // 模拟cpu负载start := time.Now()ctx, cancel := context.WithTimeout(context.Background(), testCfg.duration)defer cancel()signalCh := make(chan os.Signal, 1)signal.Notify(signalCh, syscall.SIGINT, syscall.SIGTERM)monitor := NewSCMonitor(3 * time.Second)monitor.Start()defer monitor.Stop()for i := 0; i < testCfg.routine; i++ {if testCfg.action == "w" {go monitor.Monitor(ctx, func() (duration time.Duration, succeed bool) { // 只需要传入一个控制测试停止的Context和M一个单条条执行的func即可,其他的的交给Monitorreturn doWrite(int64(i))})} else if testCfg.action == "r" {go monitor.Monitor(ctx, func() (duration time.Duration, succeed bool) {return doRead(int64(i))})} else {panic(fmt.Sprintf("unsupport param action: %s", testCfg.action))}}select {case <-ctx.Done():fmt.Println("测试完成, 正在退出 ...")breakcase <-signalCh: // 收到退出信号,正常处理退出testCfg.duration = time.Since(start)fmt.Println("收到终止信号, 正在退出 ...")cancel()break}fmt.Println(testCfg.String())
}