【kafka】Golang实现分布式Masscan任务调度系统
要求:
输出两个程序,一个命令行程序(命令行参数用flag)和一个服务端程序。
命令行程序支持通过命令行参数配置下发IP或IP段、端口、扫描带宽,然后将消息推送到kafka里面。
服务端程序:
- 从kafka消费者接收扫描任务信息
- 通过调用masscan启动探测任务,获取进度和结果信息,进度写入Redis,结果信息写入Kafka。
- 要求对启动任务、kafka、整理流程进行封装。
- 要求启动2个server端,通过命令行程序下发2个不同网段,可以均匀的分配到2个server上面执行完成。
测试要求:
- 启动两个server端程序。
- 通过命令行程序下发两个任务,IP不一样。
- 看server端程序日志,是否均匀的扫描了两个任务。
前置准备:
安装docker
思路:
1. 系统架构设计
采用生产者-消费者模式:
- 命令行客户端作为生产者,将扫描任务发布到Kafka
- 两个服务端实例作为消费者,从Kafka获取任务并执行
2. 关键组件设计
-
任务表示:
- 使用JSON格式表示扫描任务,包含:
- IP范围(单个IP或CIDR格式)
- 端口范围
- 扫描带宽限制
- 任务状态
- 进度信息
- 使用JSON格式表示扫描任务,包含:
-
Kafka设计:
- 创建一个主题(如
scan-tasks
) - 使用单个分区确保任务顺序性(或根据需求设计分区策略)
- 考虑使用消费者组实现两个服务端的负载均衡
- 创建一个主题(如
-
Redis设计:
- 存储任务进度信息
- 使用Hash结构存储每个任务的进度百分比
- 设置适当的TTL防止数据无限增长
-
服务端负载均衡:
- 两个服务端加入同一个Kafka消费者组
- Kafka会自动将任务均匀分配给两个消费者
3. 执行流程
-
客户端流程:
- 解析命令行参数(IP范围、端口、带宽)
- 验证输入格式
- 创建Kafka生产者
- 将任务发布到Kafka主题
-
服务端流程:
- 初始化Kafka消费者(加入消费者组)
- 初始化Redis连接
- 循环消费任务:
a. 从Kafka获取任务
b. 更新Redis中任务状态为"running"
c. 调用masscan执行扫描:- 构造masscan命令行参数
- 启动masscan进程
- 监控进程输出和退出状态
d. 实时解析masscan输出,更新Redis中的进度
e. 扫描完成后: - 更新Redis中任务状态为"completed"
- 将完整结果发布到另一个Kafka主题(如
scan-result
)
4. 关键技术点
-
Masscan集成:
- 使用
exec.Command
启动masscan进程 - 实时解析masscan的标准输出和错误输出
- 根据输出计算扫描进度
- 使用
-
错误处理:
- 处理无效IP格式
- 处理masscan执行失败
- 处理Kafka/Redis连接问题
-
日志记录:
- 记录服务端操作日志
- 记录任务执行状态变化
- 记录错误信息
5. 测试验证思路
- 启动两个服务端实例
- 使用客户端提交两个不同网段的任务
- 观察:
- 两个服务端的日志输出
- 任务是否被均匀分配(一个服务端处理一个任务)
- 扫描进度是否正确更新
- 最终结果是否正确输出
6. 扩展考虑
-
任务优先级:
- 可以在任务中添加优先级字段
- 服务端根据优先级处理任务
-
任务超时:
- 添加任务超时机制
- 超时后重新分配任务
-
结果存储:
- 可以考虑将结果存入数据库而不仅是Kafka
-
水平扩展:
- 设计支持更多服务端实例的扩展方案
这个设计实现了基本的分布式扫描任务调度系统,核心是利用Kafka的消息队列特性实现任务分发,通过消费者组机制实现负载均衡,使用Redis作为共享状态存储。
实现:
项目结构:

kafka:
consumer
package kafkaimport ("context""errors""fmt""github.com/IBM/sarama""log""sync"
)type MessageHandler func([]byte) errortype SaramaConsumer struct {client sarama.ConsumerGrouphandlers map[string]MessageHandlerready chan boolctx context.Contextcancel context.CancelFuncconsuming sync.WaitGroupmemberId stringgroupId string
}func NewKafkaConsumer(brokers []string, groupId string, topic []string) (*SaramaConsumer, error) {config := sarama.NewConfig()config.Version = sarama.V2_5_0_0 // 使用适当的 Kafka 版本config.Consumer.Offsets.Initial = sarama.OffsetOldest //config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()}client, err := sarama.NewConsumerGroup(brokers, groupId, config)if err != nil {return nil, fmt.Errorf("failed to create consumer: %v", err)}ctx, cancelFunc := context.WithCancel(context.Background())return &SaramaConsumer{client: client,handlers: make(map[string]MessageHandler),ready: make(chan bool),ctx: ctx,cancel: cancelFunc,groupId: groupId,}, nil
}func (sc *SaramaConsumer) RegisterHandler(topic string, handler MessageHandler) {sc.handlers[topic] = handler
}func (sc *SaramaConsumer) StartConsuming(topics []string) {sc.consuming.Add(1)go func() {defer sc.consuming.Done()for {if err := sc.client.Consume(sc.ctx, topics, sc); err != nil {if errors.Is(err, sarama.ErrClosedConsumerGroup) {return // 正常关闭}log.Printf("Error from consumer: %v", err)}if sc.ctx.Err() != nil {return}sc.ready = make(chan bool)}}()<-sc.readylog.Println("Sarama consumer up and running!...")
}func (sc *SaramaConsumer) Setup(session sarama.ConsumerGroupSession) error {sc.memberId = session.MemberID()claims := session.Claims()log.Printf("Rebalance: Consumer [%s] SETUP - Assigned partitions: %v",sc.memberId, claims)close(sc.ready)return nil
}
func (sc *SaramaConsumer) Cleanup(session sarama.ConsumerGroupSession) error {claims := session.Claims()log.Printf("Rebalance: Consumer [%s] CLEANUP - Releasing partitions: %v",sc.memberId, claims)return nil
}func (sc *SaramaConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {log.Printf("Rebalance: Consumer [%s] STARTED consuming partition %d (offset %d)",sc.memberId, claim.Partition(), claim.InitialOffset())defer log.Printf("Rebalance: Consumer [%s] STOPPED consuming partition %d",sc.memberId, claim.Partition())for message := range claim.Messages() {if handler, ok := sc.handlers[message.Topic]; ok {if err := handler(message.Value); err != nil {log.Printf("Error from consumer: %v", err)}session.MarkMessage(message, "")} else {log.Printf("Error from consumer: %v", message.Topic)}}return nil
}func (sc *SaramaConsumer) Close() error {sc.cancel()sc.consuming.Wait()return sc.client.Close()
}
producer
package kafkaimport ("encoding/json""fmt""github.com/IBM/sarama""log"
)type SaramaProducer struct {producer sarama.SyncProducer
}func NewSaramaProducer(brokers []string) (*SaramaProducer, error) {config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForAll // 等待所有副本确认config.Producer.Retry.Max = 5 // 重试次数config.Producer.Return.Successes = true // 需要成功交付的返回config.Producer.Partitioner = sarama.NewRoundRobinPartitioner //轮询策略producer, err := sarama.NewSyncProducer(brokers, config)if err != nil {return nil, fmt.Errorf("failed to create Sarama producer: %v", err)}return &SaramaProducer{producer: producer}, nil
}func (sp *SaramaProducer) SendMessage(topic string, value interface{}) error {jsonValue, err := json.Marshal(value)if err != nil {return fmt.Errorf("fail to marshal value: %v", err)}msg := sarama.ProducerMessage{Topic: topic,Value: sarama.ByteEncoder(jsonValue),}partition, offset, err := sp.producer.SendMessage(&msg)if err != nil {return fmt.Errorf("fail to send message: %v", err)}log.Printf("Message sent to partition %d at offset %d\n", partition, offset)return nil
}func (sp *SaramaProducer) Close() error {return sp.producer.Close()
}
main
server
package mainimport ("context""encoding/json""fmt""log""os""os/signal""syscall""time""week/v3/kafka""week/v3/progress""week/v3/scanner""week/v3/task"
)type Server struct {consumer *kafka.SaramaConsumerproducer *kafka.SaramaProducerprogressTracker *progress.ProgressTrackerscanner *scanner.MasscanExecutor
}func NewServer(brokers []string, groupId string, topic []string, redisAddr string) (*Server, error) {consumer, err := kafka.NewKafkaConsumer(brokers, groupId, topic)if err != nil {return nil, fmt.Errorf("failed to create Kafka consumer: %w", err)}producer, err := kafka.NewSaramaProducer(brokers)if err != nil {return nil, fmt.Errorf("failed to create Kafka producer: %w", err)}tracker := progress.NewProgressTracker(redisAddr)return &Server{consumer: consumer,producer: producer,progressTracker: tracker,scanner: scanner.NewMasscanExecutor(),}, nil
}func (s *Server) Start(ctx context.Context) {s.consumer.RegisterHandler("scan-tasks", func(msg []byte) error {var t task.Taskif err := json.Unmarshal(msg, &t); err != nil {return fmt.Errorf("failed to unmarshal task: %v", err)}log.Printf(" Received task: %+v\n", t)// 更新任务开始状态if err := s.progressTracker.UpdateProgress(ctx, t.TaskId, 0); err != nil {return fmt.Errorf("failed to update progress: %v", err)}// 执行 masscan 扫描resultChan, errChan := s.scanner.Execute(ctx, t.IPRange, t.Ports, t.BandWidth)log.Println("Masscan Execute called")select {case results := <-resultChan:log.Printf(" Scan complete. %d results.\n", len(results))for _, result := range results {scanResult := struct {TaskID string `json:"task_id"`scanner.ScanResultTimestamp int64 `json:"timestamp"`}{TaskID: t.TaskId,ScanResult: result,Timestamp: time.Now().Unix(),}// 发到 scan-result topicjsonResult, err := json.Marshal(scanResult)if err != nil {return fmt.Errorf("failed to marshal scan result: %v", err)}if err := s.producer.SendMessage("scan-result", jsonResult); err != nil {log.Printf(" Failed to send scan result to Kafka: %v", err)} else {log.Printf(" Sent scan result: %+v", scanResult)}}// 更新任务完成状态if err := s.progressTracker.UpdateProgress(ctx, t.TaskId, 100); err != nil {return fmt.Errorf("failed to update progress: %v", err)}case err := <-errChan:log.Printf(" Scan error: %v\n", err)// 更新任务失败状态if err := s.progressTracker.UpdateProgress(ctx, t.TaskId, 0); err != nil {return fmt.Errorf("failed to update progress: %v", err)}return fmt.Errorf("processing task failed: %v", err)}return nil})// 启动 Kafka 消费go s.consumer.StartConsuming([]string{"scan-tasks"})log.Println(" Server is running and waiting for tasks...")// 等待退出信号<-ctx.Done()log.Println(" Shutting down...")if err := s.consumer.Close(); err != nil {log.Printf("Error closing consumer: %v", err)}if err := s.producer.Close(); err != nil {log.Printf("Error closing producer: %v", err)}if err := s.progressTracker.Close(); err != nil {log.Printf("Error closing progress tracker: %v", err)}log.Println(" Server shutdown complete")
}func main() {if len(os.Args) < 2 {log.Fatal("Usage: server <consumer-group-id>")}groupID := os.Args[1]brokers := []string{"localhost:9092"}redisAddr := "localhost:6379"server, err := NewServer(brokers, groupID, []string{"scan-tasks"}, redisAddr)if err != nil {log.Fatalf("Failed to create server: %v", err)}// 监听中断信号ctx, cancel := context.WithCancel(context.Background())sigChan := make(chan os.Signal, 1)signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)go func() {<-sigChancancel()}()server.Start(ctx)
}
client
package mainimport ("flag""fmt""log""strings""week/v3/task"
)func main() {ipRange := flag.String("ip", "", "ip地址")ports := flag.String("ports", "", "端口范围,如 80,443")bandwidth := flag.String("bandwidth", "1000", "扫描带宽")kafkaBroker := flag.String("kafka", "localhost:9092", "kafka broker 地址")topic := flag.String("topic", "scan-tasks", "kafka topic")flag.Parse()if *ipRange == "" || *ports == "" {fmt.Println("必须指定 ip 和 ports 参数")flag.PrintDefaults()return}brokers := strings.Split(*kafkaBroker, ",")taskManager, err := task.NewTaskManager(*topic, brokers)if err != nil {log.Fatalf("Failed to create task manager: %v", err)}defer taskManager.Close()taskID, err := taskManager.SubmitTask(*ipRange, *ports, *bandwidth)if err != nil {log.Fatalf("Failed to submit task: %v", err)}fmt.Printf(" Task submitted successfully. Task ID: %s\n", taskID)
}
progress
package progressimport ("context""encoding/json""fmt""github.com/go-redis/redis/v8""time"
)type Progress struct {TaskID string `json:"task_id"`Progress float64 `json:"progress"`Status string `json:"status"`Timestamp int64 `json:"timestamp"`
}type ProgressTracker struct {redisClient *redis.Client
}func NewProgressTracker(redisAddr string) *ProgressTracker {return &ProgressTracker{redisClient: redis.NewClient(&redis.Options{Addr: redisAddr,}),}
}func (pt *ProgressTracker) UpdateProgress(ctx context.Context, taskID string, progress float64) error {p := Progress{Progress: progress,}jsonData, err := json.Marshal(p)if err != nil {return fmt.Errorf("failed to marshal progress: %v", err)}err = pt.redisClient.Set(ctx, taskID, string(jsonData), time.Hour).Err()if err != nil {return fmt.Errorf("failed to update progress tracker: %v", err)}return nil
}func (pt *ProgressTracker) Close() error {return pt.redisClient.Close()
}
scanner
package scannerimport ("bufio""bytes""context""encoding/json""fmt""os/exec"
)type ScanResult struct {IP string `json:"ip"`Port string `json:"port"`Status string `json:"status"`
}type MasscanExecutor struct{}func NewMasscanExecutor() *MasscanExecutor {return &MasscanExecutor{}
}func (me *MasscanExecutor) Execute(ctx context.Context, ipRange, ports, bandWidth string) (<-chan []ScanResult, <-chan error) {resultChan := make(chan []ScanResult)errorChan := make(chan error)go func() {defer close(resultChan)defer close(errorChan)args := []string{"-p", ports,"--rate", bandWidth,"-oJ", "-",ipRange,}cmd := exec.CommandContext(ctx, "masscan", args...)output, err := cmd.CombinedOutput()if err != nil {errorChan <- fmt.Errorf("masscan command error: %v, output: %s", err, string(output))return}// 打印输出用于调试fmt.Printf("masscan output:\n%s\n", string(output))scanner := bufio.NewScanner(bytes.NewReader(output))var results []ScanResultfor scanner.Scan() {line := scanner.Bytes()// 过滤非JSON行if len(line) == 0 || (line[0] != '{' && line[0] != '[') {continue}var r struct {IP string `json:"ip"`Ports []struct {Port int `json:"port"`Status string `json:"status"`} `json:"ports"`}if err := json.Unmarshal(line, &r); err != nil {errorChan <- fmt.Errorf("failed to unmarshal line: %v", err)return}for _, p := range r.Ports {results = append(results, ScanResult{IP: r.IP,Port: fmt.Sprintf("%d", p.Port),Status: p.Status,})}}if err := scanner.Err(); err != nil {errorChan <- fmt.Errorf("scanner error: %v", err)return}resultChan <- results}()return resultChan, errorChan
}
task
package taskimport ("encoding/json""fmt""github.com/IBM/sarama""github.com/google/uuid""strings"
)type Task struct {TaskId string `json:"task_id"`IPRange string `json:"ip_range"`Ports string `json:"ports"`BandWidth string `json:"band_width"`
}type TaskManager struct {producer sarama.SyncProducertopic string
}func NewTaskManager(topic string, brokers []string) (*TaskManager, error) {config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForAllconfig.Producer.Retry.Max = 5config.Producer.Return.Successes = trueconfig.Producer.Partitioner = sarama.NewRoundRobinPartitionerproducer, err := sarama.NewSyncProducer(brokers, config)if err != nil {return nil, err}return &TaskManager{producer: producer, topic: topic}, nil
}func (tm *TaskManager) SubmitTask(ipRange, ports, bandWidth string) ([]string, error) {ipList := strings.Split(ipRange, ",")var taskIDs []stringfor _, ip := range ipList {task := &Task{TaskId: uuid.New().String(),IPRange: ip, // 这里只发送一个 IPPorts: ports,BandWidth: bandWidth,}jsonTask, err := json.Marshal(task)if err != nil {return nil, fmt.Errorf("failed to marshal task: %v", err)}msg := sarama.ProducerMessage{Topic: tm.topic,Value: sarama.ByteEncoder(jsonTask),}_, _, err = tm.producer.SendMessage(&msg)if err != nil {return nil, fmt.Errorf("failed to send message: %v", err)}taskIDs = append(taskIDs, task.TaskId)}return taskIDs, nil
}func (tm *TaskManager) Close() error {return tm.producer.Close()
}
docker-compose
version: '3.8'services:zookeeper:image: confluentinc/cp-zookeeper:7.5.0container_name: zookeeperports:- "2181:2181"environment:TZ: Asia/ShanghaiZOOKEEPER_CLIENT_PORT: 2181ZOOKEEPER_TICK_TIME: 2000kafka:image: confluentinc/cp-kafka:7.5.0container_name: kafkaports:- "9092:9092"depends_on:- zookeeperenvironment:TZ: Asia/ShanghaiKAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1redis:image: redis:7.2container_name: redisports:- "6379:6379"environment:TZ: Asia/Shanghaivolumes:- redis-data:/datavolumes:redis-data:
验证
起两个server端,一个client。先进入docker中起的kafka服务
docker exec -it kafka bash
查看kafka中的topic,在服务端和客户端未开启时,kafka中无topic,启动后注册三个topic。
scan-tasks存放扫描任务,scan-results存放扫描结果,__consumer_offsets存放偏移量
因为要两个server负载平衡,所以topic中要有两个partition,
kafka-topics --alter --bootstrap-server localhost:9092 --topic scan-tasks --partitions 2
kafka-topics --describe --bootstrap-server localhost:9092 --topic scan-tasks
使用命令行工具起两个server端
启动一个client下发命令
服务端能均匀消费信息。
感心趣可以自行查看扫描结果和写入redis的进度。