当前位置: 首页 > news >正文

【kafka】Golang实现分布式Masscan任务调度系统

要求:

        输出两个程序,一个命令行程序(命令行参数用flag)和一个服务端程序。

        命令行程序支持通过命令行参数配置下发IP或IP段、端口、扫描带宽,然后将消息推送到kafka里面。

服务端程序:

  1. 从kafka消费者接收扫描任务信息
  2. 通过调用masscan启动探测任务,获取进度和结果信息,进度写入Redis,结果信息写入Kafka。
  3. 要求对启动任务、kafka、整理流程进行封装。
  4. 要求启动2个server端,通过命令行程序下发2个不同网段,可以均匀的分配到2个server上面执行完成。

测试要求:

  1. 启动两个server端程序。
  2. 通过命令行程序下发两个任务,IP不一样。
  3. 看server端程序日志,是否均匀的扫描了两个任务。

     

前置准备:

        安装docker

思路:

1. 系统架构设计

采用生产者-消费者模式:

  • 命令行客户端作为生产者,将扫描任务发布到Kafka
  • 两个服务端实例作为消费者,从Kafka获取任务并执行

2. 关键组件设计

  1. 任务表示

    • 使用JSON格式表示扫描任务,包含:
      • IP范围(单个IP或CIDR格式)
      • 端口范围
      • 扫描带宽限制
      • 任务状态
      • 进度信息
  2. Kafka设计

    • 创建一个主题(如scan-tasks
    • 使用单个分区确保任务顺序性(或根据需求设计分区策略)
    • 考虑使用消费者组实现两个服务端的负载均衡
  3. Redis设计

    • 存储任务进度信息
    • 使用Hash结构存储每个任务的进度百分比
    • 设置适当的TTL防止数据无限增长
  4. 服务端负载均衡

    • 两个服务端加入同一个Kafka消费者组
    • Kafka会自动将任务均匀分配给两个消费者

3. 执行流程

  1. 客户端流程

    • 解析命令行参数(IP范围、端口、带宽)
    • 验证输入格式
    • 创建Kafka生产者
    • 将任务发布到Kafka主题
  2. 服务端流程

    • 初始化Kafka消费者(加入消费者组)
    • 初始化Redis连接
    • 循环消费任务:
      a. 从Kafka获取任务
      b. 更新Redis中任务状态为"running"
      c. 调用masscan执行扫描:
      • 构造masscan命令行参数
      • 启动masscan进程
      • 监控进程输出和退出状态
        d. 实时解析masscan输出,更新Redis中的进度
        e. 扫描完成后:
      • 更新Redis中任务状态为"completed"
      • 将完整结果发布到另一个Kafka主题(如scan-result

4. 关键技术点

  1. Masscan集成

    • 使用exec.Command启动masscan进程
    • 实时解析masscan的标准输出和错误输出
    • 根据输出计算扫描进度
  2. 错误处理

    • 处理无效IP格式
    • 处理masscan执行失败
    • 处理Kafka/Redis连接问题
  3. 日志记录

    • 记录服务端操作日志
    • 记录任务执行状态变化
    • 记录错误信息

5. 测试验证思路

  1. 启动两个服务端实例
  2. 使用客户端提交两个不同网段的任务
  3. 观察:
    • 两个服务端的日志输出
    • 任务是否被均匀分配(一个服务端处理一个任务)
    • 扫描进度是否正确更新
    • 最终结果是否正确输出

6. 扩展考虑

  1. 任务优先级

    • 可以在任务中添加优先级字段
    • 服务端根据优先级处理任务
  2. 任务超时

    • 添加任务超时机制
    • 超时后重新分配任务
  3. 结果存储

    • 可以考虑将结果存入数据库而不仅是Kafka
  4. 水平扩展

    • 设计支持更多服务端实例的扩展方案

这个设计实现了基本的分布式扫描任务调度系统,核心是利用Kafka的消息队列特性实现任务分发,通过消费者组机制实现负载均衡,使用Redis作为共享状态存储。

实现:

        项目结构:
        

         kafka:
         consumer   
        
package kafkaimport ("context""errors""fmt""github.com/IBM/sarama""log""sync"
)type MessageHandler func([]byte) errortype SaramaConsumer struct {client    sarama.ConsumerGrouphandlers  map[string]MessageHandlerready     chan boolctx       context.Contextcancel    context.CancelFuncconsuming sync.WaitGroupmemberId  stringgroupId   string
}func NewKafkaConsumer(brokers []string, groupId string, topic []string) (*SaramaConsumer, error) {config := sarama.NewConfig()config.Version = sarama.V2_5_0_0                      // 使用适当的 Kafka 版本config.Consumer.Offsets.Initial = sarama.OffsetOldest //config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()}client, err := sarama.NewConsumerGroup(brokers, groupId, config)if err != nil {return nil, fmt.Errorf("failed to create consumer: %v", err)}ctx, cancelFunc := context.WithCancel(context.Background())return &SaramaConsumer{client:   client,handlers: make(map[string]MessageHandler),ready:    make(chan bool),ctx:      ctx,cancel:   cancelFunc,groupId:  groupId,}, nil
}func (sc *SaramaConsumer) RegisterHandler(topic string, handler MessageHandler) {sc.handlers[topic] = handler
}func (sc *SaramaConsumer) StartConsuming(topics []string) {sc.consuming.Add(1)go func() {defer sc.consuming.Done()for {if err := sc.client.Consume(sc.ctx, topics, sc); err != nil {if errors.Is(err, sarama.ErrClosedConsumerGroup) {return // 正常关闭}log.Printf("Error from consumer: %v", err)}if sc.ctx.Err() != nil {return}sc.ready = make(chan bool)}}()<-sc.readylog.Println("Sarama consumer up and running!...")
}func (sc *SaramaConsumer) Setup(session sarama.ConsumerGroupSession) error {sc.memberId = session.MemberID()claims := session.Claims()log.Printf("Rebalance: Consumer [%s] SETUP - Assigned partitions: %v",sc.memberId, claims)close(sc.ready)return nil
}
func (sc *SaramaConsumer) Cleanup(session sarama.ConsumerGroupSession) error {claims := session.Claims()log.Printf("Rebalance: Consumer [%s] CLEANUP - Releasing partitions: %v",sc.memberId, claims)return nil
}func (sc *SaramaConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {log.Printf("Rebalance: Consumer [%s] STARTED consuming partition %d (offset %d)",sc.memberId, claim.Partition(), claim.InitialOffset())defer log.Printf("Rebalance: Consumer [%s] STOPPED consuming partition %d",sc.memberId, claim.Partition())for message := range claim.Messages() {if handler, ok := sc.handlers[message.Topic]; ok {if err := handler(message.Value); err != nil {log.Printf("Error from consumer: %v", err)}session.MarkMessage(message, "")} else {log.Printf("Error from consumer: %v", message.Topic)}}return nil
}func (sc *SaramaConsumer) Close() error {sc.cancel()sc.consuming.Wait()return sc.client.Close()
}
   producer
package kafkaimport ("encoding/json""fmt""github.com/IBM/sarama""log"
)type SaramaProducer struct {producer sarama.SyncProducer
}func NewSaramaProducer(brokers []string) (*SaramaProducer, error) {config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForAll              // 等待所有副本确认config.Producer.Retry.Max = 5                                 // 重试次数config.Producer.Return.Successes = true                       // 需要成功交付的返回config.Producer.Partitioner = sarama.NewRoundRobinPartitioner //轮询策略producer, err := sarama.NewSyncProducer(brokers, config)if err != nil {return nil, fmt.Errorf("failed to create Sarama producer: %v", err)}return &SaramaProducer{producer: producer}, nil
}func (sp *SaramaProducer) SendMessage(topic string, value interface{}) error {jsonValue, err := json.Marshal(value)if err != nil {return fmt.Errorf("fail to marshal value: %v", err)}msg := sarama.ProducerMessage{Topic: topic,Value: sarama.ByteEncoder(jsonValue),}partition, offset, err := sp.producer.SendMessage(&msg)if err != nil {return fmt.Errorf("fail to send message: %v", err)}log.Printf("Message sent to partition %d at offset %d\n", partition, offset)return nil
}func (sp *SaramaProducer) Close() error {return sp.producer.Close()
}
main
        server
package mainimport ("context""encoding/json""fmt""log""os""os/signal""syscall""time""week/v3/kafka""week/v3/progress""week/v3/scanner""week/v3/task"
)type Server struct {consumer        *kafka.SaramaConsumerproducer        *kafka.SaramaProducerprogressTracker *progress.ProgressTrackerscanner         *scanner.MasscanExecutor
}func NewServer(brokers []string, groupId string, topic []string, redisAddr string) (*Server, error) {consumer, err := kafka.NewKafkaConsumer(brokers, groupId, topic)if err != nil {return nil, fmt.Errorf("failed to create Kafka consumer: %w", err)}producer, err := kafka.NewSaramaProducer(brokers)if err != nil {return nil, fmt.Errorf("failed to create Kafka producer: %w", err)}tracker := progress.NewProgressTracker(redisAddr)return &Server{consumer:        consumer,producer:        producer,progressTracker: tracker,scanner:         scanner.NewMasscanExecutor(),}, nil
}func (s *Server) Start(ctx context.Context) {s.consumer.RegisterHandler("scan-tasks", func(msg []byte) error {var t task.Taskif err := json.Unmarshal(msg, &t); err != nil {return fmt.Errorf("failed to unmarshal task: %v", err)}log.Printf(" Received task: %+v\n", t)// 更新任务开始状态if err := s.progressTracker.UpdateProgress(ctx, t.TaskId, 0); err != nil {return fmt.Errorf("failed to update progress: %v", err)}// 执行 masscan 扫描resultChan, errChan := s.scanner.Execute(ctx, t.IPRange, t.Ports, t.BandWidth)log.Println("Masscan Execute called")select {case results := <-resultChan:log.Printf(" Scan complete. %d results.\n", len(results))for _, result := range results {scanResult := struct {TaskID string `json:"task_id"`scanner.ScanResultTimestamp int64 `json:"timestamp"`}{TaskID:     t.TaskId,ScanResult: result,Timestamp:  time.Now().Unix(),}// 发到 scan-result topicjsonResult, err := json.Marshal(scanResult)if err != nil {return fmt.Errorf("failed to marshal scan result: %v", err)}if err := s.producer.SendMessage("scan-result", jsonResult); err != nil {log.Printf(" Failed to send scan result to Kafka: %v", err)} else {log.Printf(" Sent scan result: %+v", scanResult)}}// 更新任务完成状态if err := s.progressTracker.UpdateProgress(ctx, t.TaskId, 100); err != nil {return fmt.Errorf("failed to update progress: %v", err)}case err := <-errChan:log.Printf(" Scan error: %v\n", err)// 更新任务失败状态if err := s.progressTracker.UpdateProgress(ctx, t.TaskId, 0); err != nil {return fmt.Errorf("failed to update progress: %v", err)}return fmt.Errorf("processing task failed: %v", err)}return nil})// 启动 Kafka 消费go s.consumer.StartConsuming([]string{"scan-tasks"})log.Println(" Server is running and waiting for tasks...")// 等待退出信号<-ctx.Done()log.Println(" Shutting down...")if err := s.consumer.Close(); err != nil {log.Printf("Error closing consumer: %v", err)}if err := s.producer.Close(); err != nil {log.Printf("Error closing producer: %v", err)}if err := s.progressTracker.Close(); err != nil {log.Printf("Error closing progress tracker: %v", err)}log.Println(" Server shutdown complete")
}func main() {if len(os.Args) < 2 {log.Fatal("Usage: server <consumer-group-id>")}groupID := os.Args[1]brokers := []string{"localhost:9092"}redisAddr := "localhost:6379"server, err := NewServer(brokers, groupID, []string{"scan-tasks"}, redisAddr)if err != nil {log.Fatalf("Failed to create server: %v", err)}// 监听中断信号ctx, cancel := context.WithCancel(context.Background())sigChan := make(chan os.Signal, 1)signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)go func() {<-sigChancancel()}()server.Start(ctx)
}
client
package mainimport ("flag""fmt""log""strings""week/v3/task"
)func main() {ipRange := flag.String("ip", "", "ip地址")ports := flag.String("ports", "", "端口范围,如 80,443")bandwidth := flag.String("bandwidth", "1000", "扫描带宽")kafkaBroker := flag.String("kafka", "localhost:9092", "kafka broker 地址")topic := flag.String("topic", "scan-tasks", "kafka topic")flag.Parse()if *ipRange == "" || *ports == "" {fmt.Println("必须指定 ip 和 ports 参数")flag.PrintDefaults()return}brokers := strings.Split(*kafkaBroker, ",")taskManager, err := task.NewTaskManager(*topic, brokers)if err != nil {log.Fatalf("Failed to create task manager: %v", err)}defer taskManager.Close()taskID, err := taskManager.SubmitTask(*ipRange, *ports, *bandwidth)if err != nil {log.Fatalf("Failed to submit task: %v", err)}fmt.Printf(" Task submitted successfully. Task ID: %s\n", taskID)
}
progress
package progressimport ("context""encoding/json""fmt""github.com/go-redis/redis/v8""time"
)type Progress struct {TaskID    string  `json:"task_id"`Progress  float64 `json:"progress"`Status    string  `json:"status"`Timestamp int64   `json:"timestamp"`
}type ProgressTracker struct {redisClient *redis.Client
}func NewProgressTracker(redisAddr string) *ProgressTracker {return &ProgressTracker{redisClient: redis.NewClient(&redis.Options{Addr: redisAddr,}),}
}func (pt *ProgressTracker) UpdateProgress(ctx context.Context, taskID string, progress float64) error {p := Progress{Progress: progress,}jsonData, err := json.Marshal(p)if err != nil {return fmt.Errorf("failed to marshal progress: %v", err)}err = pt.redisClient.Set(ctx, taskID, string(jsonData), time.Hour).Err()if err != nil {return fmt.Errorf("failed to update progress tracker: %v", err)}return nil
}func (pt *ProgressTracker) Close() error {return pt.redisClient.Close()
}
scanner
 
package scannerimport ("bufio""bytes""context""encoding/json""fmt""os/exec"
)type ScanResult struct {IP     string `json:"ip"`Port   string `json:"port"`Status string `json:"status"`
}type MasscanExecutor struct{}func NewMasscanExecutor() *MasscanExecutor {return &MasscanExecutor{}
}func (me *MasscanExecutor) Execute(ctx context.Context, ipRange, ports, bandWidth string) (<-chan []ScanResult, <-chan error) {resultChan := make(chan []ScanResult)errorChan := make(chan error)go func() {defer close(resultChan)defer close(errorChan)args := []string{"-p", ports,"--rate", bandWidth,"-oJ", "-",ipRange,}cmd := exec.CommandContext(ctx, "masscan", args...)output, err := cmd.CombinedOutput()if err != nil {errorChan <- fmt.Errorf("masscan command error: %v, output: %s", err, string(output))return}// 打印输出用于调试fmt.Printf("masscan output:\n%s\n", string(output))scanner := bufio.NewScanner(bytes.NewReader(output))var results []ScanResultfor scanner.Scan() {line := scanner.Bytes()// 过滤非JSON行if len(line) == 0 || (line[0] != '{' && line[0] != '[') {continue}var r struct {IP    string `json:"ip"`Ports []struct {Port   int    `json:"port"`Status string `json:"status"`} `json:"ports"`}if err := json.Unmarshal(line, &r); err != nil {errorChan <- fmt.Errorf("failed to unmarshal line: %v", err)return}for _, p := range r.Ports {results = append(results, ScanResult{IP:     r.IP,Port:   fmt.Sprintf("%d", p.Port),Status: p.Status,})}}if err := scanner.Err(); err != nil {errorChan <- fmt.Errorf("scanner error: %v", err)return}resultChan <- results}()return resultChan, errorChan
}
task
package taskimport ("encoding/json""fmt""github.com/IBM/sarama""github.com/google/uuid""strings"
)type Task struct {TaskId    string `json:"task_id"`IPRange   string `json:"ip_range"`Ports     string `json:"ports"`BandWidth string `json:"band_width"`
}type TaskManager struct {producer sarama.SyncProducertopic    string
}func NewTaskManager(topic string, brokers []string) (*TaskManager, error) {config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForAllconfig.Producer.Retry.Max = 5config.Producer.Return.Successes = trueconfig.Producer.Partitioner = sarama.NewRoundRobinPartitionerproducer, err := sarama.NewSyncProducer(brokers, config)if err != nil {return nil, err}return &TaskManager{producer: producer, topic: topic}, nil
}func (tm *TaskManager) SubmitTask(ipRange, ports, bandWidth string) ([]string, error) {ipList := strings.Split(ipRange, ",")var taskIDs []stringfor _, ip := range ipList {task := &Task{TaskId:    uuid.New().String(),IPRange:   ip, // 这里只发送一个 IPPorts:     ports,BandWidth: bandWidth,}jsonTask, err := json.Marshal(task)if err != nil {return nil, fmt.Errorf("failed to marshal task: %v", err)}msg := sarama.ProducerMessage{Topic: tm.topic,Value: sarama.ByteEncoder(jsonTask),}_, _, err = tm.producer.SendMessage(&msg)if err != nil {return nil, fmt.Errorf("failed to send message: %v", err)}taskIDs = append(taskIDs, task.TaskId)}return taskIDs, nil
}func (tm *TaskManager) Close() error {return tm.producer.Close()
}
docker-compose
version: '3.8'services:zookeeper:image: confluentinc/cp-zookeeper:7.5.0container_name: zookeeperports:- "2181:2181"environment:TZ: Asia/ShanghaiZOOKEEPER_CLIENT_PORT: 2181ZOOKEEPER_TICK_TIME: 2000kafka:image: confluentinc/cp-kafka:7.5.0container_name: kafkaports:- "9092:9092"depends_on:- zookeeperenvironment:TZ: Asia/ShanghaiKAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1redis:image: redis:7.2container_name: redisports:- "6379:6379"environment:TZ: Asia/Shanghaivolumes:- redis-data:/datavolumes:redis-data:

验证

        起两个server端,一个client。先进入docker中起的kafka服务

docker exec -it kafka bash

查看kafka中的topic,在服务端和客户端未开启时,kafka中无topic,启动后注册三个topic。

scan-tasks存放扫描任务,scan-results存放扫描结果,__consumer_offsets存放偏移量

因为要两个server负载平衡,所以topic中要有两个partition,

kafka-topics --alter --bootstrap-server localhost:9092 --topic scan-tasks --partitions 2
kafka-topics --describe --bootstrap-server localhost:9092 --topic scan-tasks


使用命令行工具起两个server端

启动一个client下发命令

服务端能均匀消费信息。

感心趣可以自行查看扫描结果和写入redis的进度。

http://www.xdnf.cn/news/964243.html

相关文章:

  • Python 自动化临时邮箱工具,轻松接收验证码,支持调用和交互模式(支持谷歌gmail/googlemail)
  • 【C++】26. 哈希扩展1—— 位图
  • 【PhysUnits】17.5 实现常量除法(div.rs)
  • Linux上并行打包压缩工具
  • Cryosparc: Local Motion Correction注意输出颗粒尺寸
  • 基于大模型的输尿管下段结石诊疗全流程预测与方案研究
  • 多场景 OkHttpClient 管理器 - Android 网络通信解决方案
  • 【AI study】ESMFold安装
  • Ribbon负载均衡实战指南:7种策略选择与生产避坑
  • 深度学习核心概念:优化器、模型可解释性与欠拟合
  • 【无标题新手学习期权从买入看涨期权开始】
  • OpenCV 图像像素值统计
  • Python入门手册:常用的Python标准库
  • C++初阶-list的模拟实现(难度较高)
  • C++学习-入门到精通【17】自定义的模板化数据结构
  • ParcelJS:零配置极速前端构建工具全解析
  • React 中的TypeScript开发范式
  • 存储设备应用指导
  • C++ 手写实现 unordered_map 和 unordered_set:深入解析与源码实战
  • 光伏功率预测 | BP神经网络多变量单步光伏功率预测(Matlab完整源码和数据)
  • word嵌入图片显示不全-error记
  • 高考志愿填报,如何查询高校历年录取分数线?
  • Vue 2.0 + C# + OnlyOffice 开发
  • Redis相关知识总结(缓存雪崩,缓存穿透,缓存击穿,Redis实现分布式锁,如何保持数据库和缓存一致)
  • K8S容器介绍
  • ubuntu24安装cuda12.6+cudnn9.6
  • 国产具身大模型首入汽车工厂,全场景验证开启工业智能新阶段
  • Vue3 watch使用
  • 路由器欧盟EN 18031网络安全认证详细解读
  • Css实现悬浮对角线边框动效