脚本只测试过H3C S12500系列交换机,只支持10GE、40GE、100GE接口。
#Author: An import ping3 import paramiko import re import time import xlwt import os def Get_Info(): f = open('./hostlist.txt', 'r') for line in f: line = line.strip('\n') SSH_Connect(line) def SSH_Connect(ssh_ip): print('正在通过SSH获取数据中,请稍等~') ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh_ping = ping3.ping(ssh_ip) #SSH钱先进行PING测试设备 if ssh_ping == None: print('%s 网络无法PING通,请检查' % ssh_ip) else: #通过SSH登录设备获取光衰信息 ssh.connect(ssh_ip, 22, 'username', 'password') channel = ssh.invoke_shell() time.sleep(1) channel.send('screen-length disable\ndisplay curr | in sysname\ndisplay transceiver diagnosis interface\n') time.sleep(60) recv_temp = channel.recv(9999999).decode(errors='ignore') #将获取的结果写入到临时文件中 f = open('./H3C_temp.log', 'w+') f.write(recv_temp) f.close() def Get_Trans(POWER): print('正在判断光衰值,请稍等~') Trans_list = [] Trans_list_temp = [] Interface_ID = [] rx = 0 tx = 0 bias = 0 f = open('./H3C_temp.log', 'r') for line in f: #读取设备名称 if re.findall('sysname', line): Hostname = line.split(' ')[2] #读取接口ID if re.match('Ten|Forty|Hundred', line): Interface_ID = line.split(' ') #判断接口为40GE和100GE的光衰情况 if len(Interface_ID) != 0 and re.match('Forty|Hundred', Interface_ID[0]): #判断接口是否缺少模块 if re.match('The transceiver is absent', line): print(Interface_ID[0] + '缺少模块') Trans_list_temp.append(Hostname) Trans_list_temp.append(Interface_ID[0]) Trans_list_temp.append('接口缺少模块') Trans_list.append(Trans_list_temp) Trans_list_temp = [] #判断是否40GE/100GE的光衰值行内容 if line != '\n' and re.match(r'\d', line[4]): #取数据,由于re返回结果为list,因此用其他变量取值更明朗写,也可以简化写一下,但是re需要运行多次 temp = re.findall(r'-?\d+\.?\d*', line) rx = float(temp[2]) bias = float(temp[1]) tx = float(temp[3]) #判断收光大于设定值,并且bias不等于0,tx和rx不能相等的情况才是需要的rx值 #TX=RX相等的情况下说明接口被shutdown掉,不做处理。bias在H3C中不准确,不确定该值是否影响结果 if rx < POWER and (rx != -40.0 or rx != -36.96) and bias != 0.00 and rx != tx: Trans_list_temp.append(Hostname) Trans_list_temp.append(Interface_ID[0]) Trans_list_temp.append(rx) rx = 0 bias = 0 tx = 0 Trans_list.append(Trans_list_temp) Trans_list_temp = [] #判断10GE接口 if len(Interface_ID) != 0 and re.match('Ten', Interface_ID[0]): if line != '\n' and re.match(r'\d', line[4]): temp = re.findall(r'-?\d+\.?\d*', line) rx = float(temp[3]) bias = float(temp[2]) tx = float(temp[4]) if rx < POWER and (rx != -40.0 or rx != -36.96) and bias != 0.00 and rx != tx: Trans_list_temp.append(Hostname) Trans_list_temp.append(Interface_ID[0]) Trans_list_temp.append(rx) rx = 0 bias = 0 tx = 0 Trans_list.append(Trans_list_temp) Trans_list_temp = [] print('.', end='') f.close() print('光衰值判断完毕,准备写入,请稍等~') return Trans_list def Write_Info(Trans_list): workbook_result = xlwt.Workbook(encoding='utf-8') sheet = workbook_result.add_sheet(u'sheet1', cell_overwrite_ok=True) # 设置标题样式和内容样式 style_title = xlwt.XFStyle() style_context = xlwt.XFStyle() font = xlwt.Font() align = xlwt.Alignment() align.horz = 0x02 align.vert = 0x01 style_title.alignment = align style_context.alignment = align font.bold = True style_title.font = font # 设置列宽 sheet.col(0).width = 10000 sheet.col(1).width = 10000 sheet.col(2).width = 10000 # 写表格表头 sheet.write(0, 0, '交换机名称', style_title) sheet.write(0, 1, '交换机端口ID', style_title) sheet.write(0, 2, 'RX Power(dBm)', style_title) # 循环写入表格 i = 0 for i in range(i, len(Trans_list)): for j in range(3): sheet.write(i + 1, j, Trans_list[i][j], style_context) workbook_result.save('H3C_Trans_result.xlsx') print('程序执行完毕~') if __name__ == '__main__': POWER = -4 Get_Info() Trans_list = Get_Trans(POWER) Write_Info(Trans_list) os.remove('./H3C_temp.log')