脚本只测试过H3C S12500系列交换机,只支持10GE、40GE、100GE接口。
#Author: An
import ping3
import paramiko
import re
import time
import xlwt
import os
def Get_Info():
f = open('./hostlist.txt', 'r')
for line in f:
line = line.strip('\n')
SSH_Connect(line)
def SSH_Connect(ssh_ip):
print('正在通过SSH获取数据中,请稍等~')
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh_ping = ping3.ping(ssh_ip)
#SSH钱先进行PING测试设备
if ssh_ping == None:
print('%s 网络无法PING通,请检查' % ssh_ip)
else:
#通过SSH登录设备获取光衰信息
ssh.connect(ssh_ip, 22, 'username', 'password')
channel = ssh.invoke_shell()
time.sleep(1)
channel.send('screen-length disable\ndisplay curr | in sysname\ndisplay transceiver diagnosis interface\n')
time.sleep(60)
recv_temp = channel.recv(9999999).decode(errors='ignore')
#将获取的结果写入到临时文件中
f = open('./H3C_temp.log', 'w+')
f.write(recv_temp)
f.close()
def Get_Trans(POWER):
print('正在判断光衰值,请稍等~')
Trans_list = []
Trans_list_temp = []
Interface_ID = []
rx = 0
tx = 0
bias = 0
f = open('./H3C_temp.log', 'r')
for line in f:
#读取设备名称
if re.findall('sysname', line):
Hostname = line.split(' ')[2]
#读取接口ID
if re.match('Ten|Forty|Hundred', line):
Interface_ID = line.split(' ')
#判断接口为40GE和100GE的光衰情况
if len(Interface_ID) != 0 and re.match('Forty|Hundred', Interface_ID[0]):
#判断接口是否缺少模块
if re.match('The transceiver is absent', line):
print(Interface_ID[0] + '缺少模块')
Trans_list_temp.append(Hostname)
Trans_list_temp.append(Interface_ID[0])
Trans_list_temp.append('接口缺少模块')
Trans_list.append(Trans_list_temp)
Trans_list_temp = []
#判断是否40GE/100GE的光衰值行内容
if line != '\n' and re.match(r'\d', line[4]):
#取数据,由于re返回结果为list,因此用其他变量取值更明朗写,也可以简化写一下,但是re需要运行多次
temp = re.findall(r'-?\d+\.?\d*', line)
rx = float(temp[2])
bias = float(temp[1])
tx = float(temp[3])
#判断收光大于设定值,并且bias不等于0,tx和rx不能相等的情况才是需要的rx值
#TX=RX相等的情况下说明接口被shutdown掉,不做处理。bias在H3C中不准确,不确定该值是否影响结果
if rx < POWER and (rx != -40.0 or rx != -36.96) and bias != 0.00 and rx != tx:
Trans_list_temp.append(Hostname)
Trans_list_temp.append(Interface_ID[0])
Trans_list_temp.append(rx)
rx = 0
bias = 0
tx = 0
Trans_list.append(Trans_list_temp)
Trans_list_temp = []
#判断10GE接口
if len(Interface_ID) != 0 and re.match('Ten', Interface_ID[0]):
if line != '\n' and re.match(r'\d', line[4]):
temp = re.findall(r'-?\d+\.?\d*', line)
rx = float(temp[3])
bias = float(temp[2])
tx = float(temp[4])
if rx < POWER and (rx != -40.0 or rx != -36.96) and bias != 0.00 and rx != tx:
Trans_list_temp.append(Hostname)
Trans_list_temp.append(Interface_ID[0])
Trans_list_temp.append(rx)
rx = 0
bias = 0
tx = 0
Trans_list.append(Trans_list_temp)
Trans_list_temp = []
print('.', end='')
f.close()
print('光衰值判断完毕,准备写入,请稍等~')
return Trans_list
def Write_Info(Trans_list):
workbook_result = xlwt.Workbook(encoding='utf-8')
sheet = workbook_result.add_sheet(u'sheet1', cell_overwrite_ok=True)
# 设置标题样式和内容样式
style_title = xlwt.XFStyle()
style_context = xlwt.XFStyle()
font = xlwt.Font()
align = xlwt.Alignment()
align.horz = 0x02
align.vert = 0x01
style_title.alignment = align
style_context.alignment = align
font.bold = True
style_title.font = font
# 设置列宽
sheet.col(0).width = 10000
sheet.col(1).width = 10000
sheet.col(2).width = 10000
# 写表格表头
sheet.write(0, 0, '交换机名称', style_title)
sheet.write(0, 1, '交换机端口ID', style_title)
sheet.write(0, 2, 'RX Power(dBm)', style_title)
# 循环写入表格
i = 0
for i in range(i, len(Trans_list)):
for j in range(3):
sheet.write(i + 1, j, Trans_list[i][j], style_context)
workbook_result.save('H3C_Trans_result.xlsx')
print('程序执行完毕~')
if __name__ == '__main__':
POWER = -4
Get_Info()
Trans_list = Get_Trans(POWER)
Write_Info(Trans_list)
os.remove('./H3C_temp.log')
#Author: An
import ping3
import paramiko
import re
import time
import xlwt
import os
def Get_Info():
f = open('./hostlist.txt', 'r')
for line in f:
line = line.strip('\n')
SSH_Connect(line)
def SSH_Connect(ssh_ip):
print('正在通过SSH获取数据中,请稍等~')
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh_ping = ping3.ping(ssh_ip)
#SSH钱先进行PING测试设备
if ssh_ping == None:
print('%s 网络无法PING通,请检查' % ssh_ip)
else:
#通过SSH登录设备获取光衰信息
ssh.connect(ssh_ip, 22, 'username', 'password')
channel = ssh.invoke_shell()
time.sleep(1)
channel.send('screen-length disable\ndisplay curr | in sysname\ndisplay transceiver diagnosis interface\n')
time.sleep(60)
recv_temp = channel.recv(9999999).decode(errors='ignore')
#将获取的结果写入到临时文件中
f = open('./H3C_temp.log', 'w+')
f.write(recv_temp)
f.close()
def Get_Trans(POWER):
print('正在判断光衰值,请稍等~')
Trans_list = []
Trans_list_temp = []
Interface_ID = []
rx = 0
tx = 0
bias = 0
f = open('./H3C_temp.log', 'r')
for line in f:
#读取设备名称
if re.findall('sysname', line):
Hostname = line.split(' ')[2]
#读取接口ID
if re.match('Ten|Forty|Hundred', line):
Interface_ID = line.split(' ')
#判断接口为40GE和100GE的光衰情况
if len(Interface_ID) != 0 and re.match('Forty|Hundred', Interface_ID[0]):
#判断接口是否缺少模块
if re.match('The transceiver is absent', line):
print(Interface_ID[0] + '缺少模块')
Trans_list_temp.append(Hostname)
Trans_list_temp.append(Interface_ID[0])
Trans_list_temp.append('接口缺少模块')
Trans_list.append(Trans_list_temp)
Trans_list_temp = []
#判断是否40GE/100GE的光衰值行内容
if line != '\n' and re.match(r'\d', line[4]):
#取数据,由于re返回结果为list,因此用其他变量取值更明朗写,也可以简化写一下,但是re需要运行多次
temp = re.findall(r'-?\d+\.?\d*', line)
rx = float(temp[2])
bias = float(temp[1])
tx = float(temp[3])
#判断收光大于设定值,并且bias不等于0,tx和rx不能相等的情况才是需要的rx值
#TX=RX相等的情况下说明接口被shutdown掉,不做处理。bias在H3C中不准确,不确定该值是否影响结果
if rx < POWER and (rx != -40.0 or rx != -36.96) and bias != 0.00 and rx != tx:
Trans_list_temp.append(Hostname)
Trans_list_temp.append(Interface_ID[0])
Trans_list_temp.append(rx)
rx = 0
bias = 0
tx = 0
Trans_list.append(Trans_list_temp)
Trans_list_temp = []
#判断10GE接口
if len(Interface_ID) != 0 and re.match('Ten', Interface_ID[0]):
if line != '\n' and re.match(r'\d', line[4]):
temp = re.findall(r'-?\d+\.?\d*', line)
rx = float(temp[3])
bias = float(temp[2])
tx = float(temp[4])
if rx < POWER and (rx != -40.0 or rx != -36.96) and bias != 0.00 and rx != tx:
Trans_list_temp.append(Hostname)
Trans_list_temp.append(Interface_ID[0])
Trans_list_temp.append(rx)
rx = 0
bias = 0
tx = 0
Trans_list.append(Trans_list_temp)
Trans_list_temp = []
print('.', end='')
f.close()
print('光衰值判断完毕,准备写入,请稍等~')
return Trans_list
def Write_Info(Trans_list):
workbook_result = xlwt.Workbook(encoding='utf-8')
sheet = workbook_result.add_sheet(u'sheet1', cell_overwrite_ok=True)
# 设置标题样式和内容样式
style_title = xlwt.XFStyle()
style_context = xlwt.XFStyle()
font = xlwt.Font()
align = xlwt.Alignment()
align.horz = 0x02
align.vert = 0x01
style_title.alignment = align
style_context.alignment = align
font.bold = True
style_title.font = font
# 设置列宽
sheet.col(0).width = 10000
sheet.col(1).width = 10000
sheet.col(2).width = 10000
# 写表格表头
sheet.write(0, 0, '交换机名称', style_title)
sheet.write(0, 1, '交换机端口ID', style_title)
sheet.write(0, 2, 'RX Power(dBm)', style_title)
# 循环写入表格
i = 0
for i in range(i, len(Trans_list)):
for j in range(3):
sheet.write(i + 1, j, Trans_list[i][j], style_context)
workbook_result.save('H3C_Trans_result.xlsx')
print('程序执行完毕~')
if __name__ == '__main__':
POWER = -4
Get_Info()
Trans_list = Get_Trans(POWER)
Write_Info(Trans_list)
os.remove('./H3C_temp.log')
#Author: An import ping3 import paramiko import re import time import xlwt import os def Get_Info(): f = open('./hostlist.txt', 'r') for line in f: line = line.strip('\n') SSH_Connect(line) def SSH_Connect(ssh_ip): print('正在通过SSH获取数据中,请稍等~') ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh_ping = ping3.ping(ssh_ip) #SSH钱先进行PING测试设备 if ssh_ping == None: print('%s 网络无法PING通,请检查' % ssh_ip) else: #通过SSH登录设备获取光衰信息 ssh.connect(ssh_ip, 22, 'username', 'password') channel = ssh.invoke_shell() time.sleep(1) channel.send('screen-length disable\ndisplay curr | in sysname\ndisplay transceiver diagnosis interface\n') time.sleep(60) recv_temp = channel.recv(9999999).decode(errors='ignore') #将获取的结果写入到临时文件中 f = open('./H3C_temp.log', 'w+') f.write(recv_temp) f.close() def Get_Trans(POWER): print('正在判断光衰值,请稍等~') Trans_list = [] Trans_list_temp = [] Interface_ID = [] rx = 0 tx = 0 bias = 0 f = open('./H3C_temp.log', 'r') for line in f: #读取设备名称 if re.findall('sysname', line): Hostname = line.split(' ')[2] #读取接口ID if re.match('Ten|Forty|Hundred', line): Interface_ID = line.split(' ') #判断接口为40GE和100GE的光衰情况 if len(Interface_ID) != 0 and re.match('Forty|Hundred', Interface_ID[0]): #判断接口是否缺少模块 if re.match('The transceiver is absent', line): print(Interface_ID[0] + '缺少模块') Trans_list_temp.append(Hostname) Trans_list_temp.append(Interface_ID[0]) Trans_list_temp.append('接口缺少模块') Trans_list.append(Trans_list_temp) Trans_list_temp = [] #判断是否40GE/100GE的光衰值行内容 if line != '\n' and re.match(r'\d', line[4]): #取数据,由于re返回结果为list,因此用其他变量取值更明朗写,也可以简化写一下,但是re需要运行多次 temp = re.findall(r'-?\d+\.?\d*', line) rx = float(temp[2]) bias = float(temp[1]) tx = float(temp[3]) #判断收光大于设定值,并且bias不等于0,tx和rx不能相等的情况才是需要的rx值 #TX=RX相等的情况下说明接口被shutdown掉,不做处理。bias在H3C中不准确,不确定该值是否影响结果 if rx < POWER and (rx != -40.0 or rx != -36.96) and bias != 0.00 and rx != tx: Trans_list_temp.append(Hostname) Trans_list_temp.append(Interface_ID[0]) Trans_list_temp.append(rx) rx = 0 bias = 0 tx = 0 Trans_list.append(Trans_list_temp) Trans_list_temp = [] #判断10GE接口 if len(Interface_ID) != 0 and re.match('Ten', Interface_ID[0]): if line != '\n' and re.match(r'\d', line[4]): temp = re.findall(r'-?\d+\.?\d*', line) rx = float(temp[3]) bias = float(temp[2]) tx = float(temp[4]) if rx < POWER and (rx != -40.0 or rx != -36.96) and bias != 0.00 and rx != tx: Trans_list_temp.append(Hostname) Trans_list_temp.append(Interface_ID[0]) Trans_list_temp.append(rx) rx = 0 bias = 0 tx = 0 Trans_list.append(Trans_list_temp) Trans_list_temp = [] print('.', end='') f.close() print('光衰值判断完毕,准备写入,请稍等~') return Trans_list def Write_Info(Trans_list): workbook_result = xlwt.Workbook(encoding='utf-8') sheet = workbook_result.add_sheet(u'sheet1', cell_overwrite_ok=True) # 设置标题样式和内容样式 style_title = xlwt.XFStyle() style_context = xlwt.XFStyle() font = xlwt.Font() align = xlwt.Alignment() align.horz = 0x02 align.vert = 0x01 style_title.alignment = align style_context.alignment = align font.bold = True style_title.font = font # 设置列宽 sheet.col(0).width = 10000 sheet.col(1).width = 10000 sheet.col(2).width = 10000 # 写表格表头 sheet.write(0, 0, '交换机名称', style_title) sheet.write(0, 1, '交换机端口ID', style_title) sheet.write(0, 2, 'RX Power(dBm)', style_title) # 循环写入表格 i = 0 for i in range(i, len(Trans_list)): for j in range(3): sheet.write(i + 1, j, Trans_list[i][j], style_context) workbook_result.save('H3C_Trans_result.xlsx') print('程序执行完毕~') if __name__ == '__main__': POWER = -4 Get_Info() Trans_list = Get_Trans(POWER) Write_Info(Trans_list) os.remove('./H3C_temp.log')