当前位置: 首页 > news >正文

自动化运维实验(二)---自动识别设备,并导出配置

 目录

一、实验拓扑

二、实验目的

三、实验步骤

实验思路:

代码部分:

四、实验结果:


一、实验拓扑

二、实验目的

ssh远程登录后,识别设备类型(华三、华为、锐捷、山石、飞塔、深信服等),再输入对应设备的命令进行配置导出

三、实验步骤

实验开始之前先搭好环境,测试无误再开始

实验思路:

利用ip.txt,存放需要登录的设备IP

再一台一台登录,更具设备独有的命令进行识别,再对配置进行导出保存。

代码部分:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
批量导出设备配置(改进版)
- 在探测/关闭分页阶段不保存任何输出
- 仅从首次成功的“导出配置”命令开始保存 raw 输出,并对其进行清理后保存 clean 输出
- 支持自动翻页、错误判断、厂商识别等
- 输出文件:output/<ip>_<vendor>_<ts>_raw.txt   (可选)output/<ip>_<vendor>_<ts>_clean.txt
配置项可在脚本顶部调整。
"""
import paramiko
import time
import re
import os
import sys
from getpass import getpass
from datetime import datetime# ---------- 配置区域(可按需修改) ----------
IP_FILE = "ip.txt"
OUTPUT_DIR = "output"
SSH_PORT = 22
CONNECT_TIMEOUT = 10
CMD_TIMEOUT = 6.0
READ_CHUNK = 65536# 是否只保存 clean(True)或同时保存 raw(False)
SAVE_CLEAN_ONLY = TrueVENDOR_KEYWORDS = {"huawei": ["huawei", "huawei vrp", "huawei Technologies", "Huawei"],"h3c": ["h3c", "h3c technologies", "Comware", "H3C"],"cisco": ["cisco", "ios", "cisco ios", "Cisco IOS Software"],"ruijie": ["ruijie", "ruijie networks", "rzos", "Ruijie"],"fortigate": ["fortigate", "fortios", "fortinet", "FortiGate"],"hillstone": ["hillstone", "hillstone networks", "shanshi", "HS"],"sangfor": ["sangfor", "深信服"],
}# 每个厂商尝试的“导出配置”命令列表
VENDOR_CONFIG_CMDS = {"cisco": ["terminal length 0", "show running-config"],"huawei": ["display current-configuration", "screen-length 0 temporary", "display this-configuration"],"h3c": ["display current-configuration", "display this-configuration", "screen-length 0 temporary"],"ruijie": ["display current-configuration", "show running-config", "screen-length 0 temporary"],"fortigate": ["show full-configuration", "get system status", "show running-config", "show"],"hillstone": ["display current-configuration", "show running-config", "terminal length 0"],"sangfor": ["display current-configuration", "show running-config"],# fallback"unknown": ["display current-configuration", "show running-config", "show full-configuration", "show"]
}# 一些通用的关闭分页命令(尝试但不将其直接视为导出成功)
PAGING_CMDS = ["terminal length 0","screen-length 0 temporary","screen-length disable","page 0","set cli pagination off","no page","undo page",
]# ---------- 正则/判定模式 ----------
_ERR_PAT = re.compile(r"unrecognized command|unknown command|invalid input|command not found|% Unrecognized|% Invalid|% Unknown|^%|[\^]\s*$",re.IGNORECASE,
)
_PAGING_PAT = re.compile(r"--More--|-- MORE --|--More--|\[More\]|Press any key|<--- More --->|--More--|More:|\(q\)|\-\-more\-\-",re.IGNORECASE,
)
_CONFIG_HINTS = re.compile(r"\b(hostname|sysname|interface|system-view|vlan|ip address|current-configuration|running-config|service password-encryption|ntp server|snmp-server|boot-image|bootrom|startup-config|device name)\b",re.IGNORECASE,
)# ---------- 辅助函数 ----------
def read_ip_file(path):ips = []if not os.path.exists(path):print("ip 文件不存在:", path)return ipswith open(path, "r", encoding="utf-8") as f:for ln in f:ln = ln.strip()if not ln or ln.startswith("#"):continueparts = [p.strip() for p in ln.split(",")]if len(parts) == 1:ips.append((parts[0], None, None))elif len(parts) >= 3:ips.append((parts[0], parts[1], parts[2]))else:ips.append((parts[0], parts[1] if len(parts) > 1 else None, None))return ipsdef ensure_output_dir(path):if not os.path.exists(path):os.makedirs(path, exist_ok=True)def timestamp():return datetime.now().strftime("%Y%m%d_%H%M%S")def open_ssh_client(ip, username, password, port=SSH_PORT, timeout=CONNECT_TIMEOUT):client = paramiko.SSHClient()client.set_missing_host_key_policy(paramiko.AutoAddPolicy())client.connect(ip, port=port, username=username, password=password, timeout=timeout, look_for_keys=False, allow_agent=False)return clientdef recv_all_from_shell(shell, timeout=CMD_TIMEOUT, pause=0.2):"""非阻塞读取:读取直到超时段内没有新数据。"""output = b""end_time = time.time() + timeoutwhile True:while shell.recv_ready():try:chunk = shell.recv(READ_CHUNK)if not chunk:breakoutput += chunkend_time = time.time() + timeoutexcept Exception:breakif time.time() > end_time:breaktime.sleep(pause)try:return output.decode('utf-8', errors='ignore')except Exception:return output.decode('latin1', errors='ignore')def send_cmd_and_recv(shell, cmd, cmd_timeout=CMD_TIMEOUT, short_sleep=0.2):"""发送命令并收集输出(支持自动翻页)。返回 (out_str, error_str)"""if not cmd.endswith("\n"):to_send = cmd + "\n"else:to_send = cmdtry:shell.send(to_send)except Exception as e:return "", "send_error: " + str(e)time.sleep(short_sleep)out = recv_all_from_shell(shell, timeout=cmd_timeout)# 如果输出包含分页提示则自动翻页(发送空格)if out and _PAGING_PAT.search(out):accum = outfor i in range(300):  # 上限,避免无限循环try:shell.send(" ")except Exception:breaktime.sleep(0.12)more = recv_all_from_shell(shell, timeout=1.0)if not more:breakaccum += more# 若本次返回不再包含分页提示,结束if not _PAGING_PAT.search(more):breakout = accumreturn out, Nonedef try_disable_paging(shell):"""尝试发送分页关闭命令(不保存这些输出)。返回拼接的输出字符串(仅用于本地判断/日志),但调用方不会把它保存为主配置文件。"""accum = []for c in PAGING_CMDS:try:out, err = send_cmd_and_recv(shell, c, cmd_timeout=1.0)accum.append(f"CMD: {c}\n{out or ''}\nerr:{err}")except Exception as e:accum.append(f"CMD: {c}\nEXCEPTION: {e}")return "\n".join(accum)def detect_vendor_from_text(text):if not text:return Nonetl = text.lower()for vendor, keys in VENDOR_KEYWORDS.items():for k in keys:if k.lower() in tl:return vendorreturn Nonedef try_commands_and_collect(shell, cmds, cmd_timeout=CMD_TIMEOUT):"""依次尝试 cmds:- 对于明显用于关闭分页/切换上下文的命令(非配置导出命令),发送后丢弃输出- 对于疑似配置导出命令(包含 display/show running-config/current-configuration 等关键词),发起捕获并返回 raw 输出返回 (successful_cmd_or_None, raw_output_or_None, tried_cmd_list)tried_cmd_list 为命令字符串列表(仅便于 clean 函数去除回显)"""tried = []for c in cmds:tried.append(c)lower = c.strip().lower()# 判断是否可能为配置导出命令(较宽的匹配)if any(k in lower for k in ("display current-configuration", "display this-configuration", "show running-config", "show full-configuration", "show configuration", "show config", "show running", "show full")):# 将该命令视为配置导出命令,开始 captureout, err = send_cmd_and_recv(shell, c, cmd_timeout=max(cmd_timeout, 10.0))if out and out.strip():return c, out, tried# 若没有拿到输出,继续尝试下一个命令continue# 否则把它当成分页关闭或上下文切换之类的命令,发送但不当作最终配置输出try:send_cmd_and_recv(shell, c, cmd_timeout=1.5)except Exception:# 忽略错误,继续尝试下一条命令passtime.sleep(0.12)# 如果循环结束仍未明确成功,则返回 None,caller 会再尝试 fallbackreturn None, None, trieddef clean_config_output_v2(text, successful_cmd=None, tried_cmds=None):"""更强力的配置清理函数(原样复用并允许传入 tried_cmds 以去掉回显)"""if not text:return ""# 1. 去掉 ANSI 控制码t = re.sub(r'\x1B\[[0-?]*[ -/]*[@-~]', '', text)# 2. 统一换行并按行处理t = t.replace('\r\n', '\n').replace('\r', '\n')lines = t.split('\n')# 3. 构建一些便捷匹配集合tried_set = set()if tried_cmds:for c in tried_cmds:if c:tried_set.add(c.strip().lower())if successful_cmd:tried_set.add(successful_cmd.strip().lower())# helper 判断是否是 prompt 行(像 <SW6> 或 hostname> 或 hostname#)def is_prompt_line(l):s = l.strip()if not s:return False# <SW6>if re.match(r'^<[^<>]+>$', s):return True# name> or name# (单独的提示符)if re.match(r'^[\w\-\._]+[>#]$', s):return Truereturn False# helper 判断是否是命令回显(完全等于某个尝试过的命令)def is_cmd_echo(l):s = l.strip().lower()if s in tried_set:return Truefor cmd in tried_set:if cmd and s.startswith(cmd) and len(s) <= len(cmd) + 6:return Truereturn False# helper 判断是否是错误/噪音行def is_error_line(l):s = l.strip()if not s:return Falseif s.startswith('%') or s == '^' or s.startswith('^'):return Trueif re.search(r'unrecognized command|invalid input|command not found|Unrecognized', s, re.IGNORECASE):return Truereturn False# helper 判断是否为“装饰性行”(大量符号)def is_decorative(l):s = l.strip()if not s:return Falseif len(s) >= 10 and (re.sub(r'[\W_]', '', s) == '' or (sum(ch.isalnum() for ch in s) / len(s)) < 0.2):return Trueif re.search(r'copyright', s, re.IGNORECASE) and len(s) < 200:return Truereturn False# 4. 先去掉明显噪声行(但保持行索引,以便后续定位 config 起点)cleaned_lines = []for ln in lines:if is_prompt_line(ln):continueif is_cmd_echo(ln):continueif is_error_line(ln):continueif is_decorative(ln):continuecleaned_lines.append(ln)# 5. 在 cleaned_lines 里寻找配置起点cfg_start_keywords = [re.compile(r'^\s*#\s*$', re.IGNORECASE),re.compile(r'^\s*version\b', re.IGNORECASE),re.compile(r'^\s*sysname\b', re.IGNORECASE),re.compile(r'^\s*hostname\b', re.IGNORECASE),re.compile(r'^\s*interface\b', re.IGNORECASE),re.compile(r'current-configuration', re.IGNORECASE),re.compile(r'running-config', re.IGNORECASE),re.compile(r'^\s*!', re.IGNORECASE),]start_idx = Nonefor i, ln in enumerate(cleaned_lines):for p in cfg_start_keywords:if p.search(ln):if p.pattern == r'^\s*#\s*$':if i + 1 < len(cleaned_lines):nxt = cleaned_lines[i + 1]if re.search(r'^\s*version\b|^\s*sysname\b|^\s*hostname\b|^\s*interface\b|current-configuration|running-config', nxt, re.IGNORECASE):start_idx = ibreakelse:start_idx = ibreakelse:start_idx = ibreakelse:start_idx = ibreakif start_idx is not None:break# 6. 如果没找到起点,尝试根据 CONFIG_HINTS 的 regex 找第一个出现位置if start_idx is None:joined = '\n'.join(cleaned_lines)m = _CONFIG_HINTS.search(joined)if m:pos = m.start()up_to = joined[:pos]start_idx = up_to.count('\n')else:out = '\n'.join(l for l in cleaned_lines).strip()out = re.sub(r'\n\s*\n+', '\n\n', out)return out# 7. 从 start_idx 开始取后续行,并再做最后清理(去掉行首/尾多余空格,去掉连续多空行)final_lines = cleaned_lines[start_idx:]while final_lines and not final_lines[0].strip():final_lines.pop(0)final_lines2 = []for ln in final_lines:if is_prompt_line(ln):continueif is_error_line(ln):continuefinal_lines2.append(ln.rstrip())out = '\n'.join(final_lines2).strip()out = re.sub(r'\n\s*\n+', '\n\n', out)return outdef save_output(ip, vendor, raw_text, cleaned_text, prefix=OUTPUT_DIR):"""保存文件:如果 SAVE_CLEAN_ONLY 为 True 则仅保存 cleaned_text,否则保存 raw + clean返回已保存的主路径(clean 的路径)"""ensure_output_dir(prefix)ts = timestamp()safe_vendor = vendor if vendor else "unknown"base = f"{ip}_{safe_vendor}_{ts}"clean_path = os.path.join(prefix, base + "_clean.txt")try:with open(clean_path, "w", encoding="utf-8") as f:f.write(cleaned_text or "")except Exception as e:print(f"[{ip}] 写 clean 文件失败: {e}")if not SAVE_CLEAN_ONLY and raw_text is not None:raw_path = os.path.join(prefix, base + "_raw.txt")try:with open(raw_path, "w", encoding="utf-8") as f:f.write(raw_text or "")except Exception as e:print(f"[{ip}] 写 raw 文件失败: {e}")return clean_path# ---------- 主流程 ----------
def process_device(ip, user, pwd, port=SSH_PORT):ip_str = ipprint("-> 处理:", ip_str)try:client = open_ssh_client(ip, username=user, password=pwd, port=port)except Exception as e:msg = f"SSH 连接失败: {e}"print(msg)return {"ip": ip, "ok": False, "error": msg}try:shell = client.invoke_shell()time.sleep(0.2)except Exception as e:client.close()msg = f"invoke_shell 失败: {e}"print(msg)return {"ip": ip, "ok": False, "error": msg}# 读取初始 banner(不保存)time.sleep(0.2)try:intro = recv_all_from_shell(shell, timeout=1.0)except Exception:intro = ""# 尝试关闭分页(输出不保存)try:_ = try_disable_paging(shell)except Exception:pass# 探测厂商(发送若干探测命令,输出不保存)detect_cmds = ["display version", "show version", "get system status", "show system info", "uname -a"]detect_text = intro or ""for dc in detect_cmds:try:out, err = send_cmd_and_recv(shell, dc, cmd_timeout=1.5)except Exception:out = ""if out:detect_text += "\n" + outif re.search(r"huawei|h3c|cisco|forti|fortigate|ruijie|hillstone|sangfor|ios|vrp|comware", out, re.IGNORECASE):breakvendor = detect_vendor_from_text(detect_text) or "unknown"print(f"   识别厂商为: {vendor}")# 尝试按厂商命令导出配置 —— 仅在配置命令上开始 capture 并保存cmds = VENDOR_CONFIG_CMDS.get(vendor, VENDOR_CONFIG_CMDS["unknown"])successful_cmd, raw_output, tried_cmds = try_commands_and_collect(shell, cmds, cmd_timeout=CMD_TIMEOUT)# 若没有拿到,作为最后手段再尝试常见命令并长时间读取(这些输出将被当作配置输出尝试保存)if not raw_output:fallback_cmds = ["display current-configuration", "show running-config", "show full-configuration"]for fc in fallback_cmds:try:out, err = send_cmd_and_recv(shell, fc, cmd_timeout=10.0)except Exception:out = ""tried_cmds.append(fc)if out and out.strip():raw_output = outsuccessful_cmd = fcbreak# 如果仍然没有明显配置输出,尝试读取 shell 剩余的输出保存以便排查(但这并非配置)if not raw_output:time.sleep(0.5)more = recv_all_from_shell(shell, timeout=2.0)if more and more.strip():raw_output = more# 只保存配置相关内容:clean 后写入文件;raw 可选cleaned = clean_config_output_v2(raw_output or "", successful_cmd=successful_cmd, tried_cmds=tried_cmds)saved_path = save_output(ip, vendor, raw_output, cleaned)print(f"   输出已保存: {saved_path}")try:shell.close()except Exception:passtry:client.close()except Exception:passok = bool(cleaned and _CONFIG_HINTS.search(cleaned))return {"ip": ip, "ok": ok, "vendor": vendor, "path": saved_path, "raw_out": raw_output, "clean_out": cleaned}def main():print("批量导出设备配置脚本(仅保存配置阶段输出)")ips = read_ip_file(IP_FILE)if not ips:print("ip 列表为空,请在 ip.txt 中每行写入一个 IP(或 ip,username,password)")returnneed_cred = any(u is None or p is None for (_, u, p) in ips)default_user = Nonedefault_pass = Noneif need_cred:default_user = input("请输入 SSH 用户名: ").strip()default_pass = getpass("请输入 SSH 密码: ")ensure_output_dir(OUTPUT_DIR)results = []for ip, u, p in ips:user = u if u else default_userpwd = p if p else default_passif not user or not pwd:print(f"缺少该设备 {ip} 的用户名或密码,跳过")results.append({"ip": ip, "ok": False, "error": "no credentials"})continuetry:res = process_device(ip, user, pwd, port=SSH_PORT)results.append(res)except Exception as e:print(f"处理 {ip} 时异常: {e}")results.append({"ip": ip, "ok": False, "error": str(e)})print("\n处理完成,总结:")for r in results:if r.get("ok"):print(f" {r['ip']} -> OK, vendor={r.get('vendor')}, file={r.get('path')}")else:print(f" {r['ip']} -> MAYBE FAILED (仍保存了可供排查的文件), reason={r.get('error')}, file={r.get('path')}")if __name__ == "__main__":try:main()except KeyboardInterrupt:print("\n用户中断,退出")sys.exit(1)

四、实验结果:

建议在终端运行,pycharm运行,getpass会卡住

可以看到每台设备都成功登录,并将配置保存在txt文件(2.15应该是设备问题)

最终生成的文件:

实验完成!

http://www.xdnf.cn/news/1297135.html

相关文章:

  • AM32电调学习-使用Keil编译uboot
  • 搭建局域网yum源仓库全流程
  • 华为实验 链路聚合
  • GoLand 项目从 0 到 1:第八天 ——GORM 命名策略陷阱与 Go 项目启动慢问题攻坚
  • 更新pip及Python软件包的完整指南
  • STM32HAL 快速入门(七):GPIO 输入之光敏传感器控制蜂鸣器
  • 第3节 深度学习避坑指南:从过拟合到玄学优化
  • 92、23种设计模式-单例模式
  • 【软考架构】信息安全基础知识
  • 考研408《计算机组成原理》复习笔记,第五章(1)——CPU功能和结构
  • 云原生存储架构设计与性能优化
  • 【深度学习计算性能】04:硬件
  • CTFSHOW | nodejs题解 web334 - web344
  • 主进程如何将客户端连接分配到房间进程
  • 数巅中标中建科技AI知识库项目,开启建筑业数智化新篇章
  • 项目日志框架与jar中日志框架冲突 解决
  • MFC的使用——使用ChartCtrl绘制曲线
  • DataHub IoT Gateway:工业现场设备与云端平台安全互联的高效解决方案
  • 使用HalconDotNet实现异步多相机采集与实时处理
  • 零信任架构(Zero Trust Architecture, ZTA)(通过动态验证和最小权限控制,实现对所有访问请求的严格授权和持续监控)
  • Kafka消费者组
  • OpenCV阈值处理详解
  • Docker pull拉取镜像命令的入门教程
  • K8s学习----Namespace:资源隔离与环境管理的核心机制
  • Rabbitmq+STS+discovery_k8s +localpv部署排坑详解
  • 希尔排序专栏
  • C++ 仿RabbitMQ实现消息队列项目
  • Trae x Figma MCP一键将设计稿转化为精美网页
  • 通信算法之313:FPGA中实现滑动相关消耗DSP资源及7045/7035的乘法器资源
  • Mysql基本使用语句(一)