python 示例

简单操作mysql

#!/usr/bin/env python
# coding=utf-8

import pymysql
import logging
import traceback
import json
import redis

from DBUtils.PooledDB import PooledDB
from controller.core import checkArgs, getIni


class DbHelper(object):
    """
        操作mysql数据库的类
        @ section   指定ini配置文件中的区块名
        @ db_host   指定数据IP地址
        @ db_user   指定数据库连接帐号
        @ db_pass   指定数据库连接密码
        @ db_name   指定数据库名
        @ db_port   指定数据库连接端口
    """
    __pool = None

    def __init__(self, db_host=None, db_user=None, db_pass=None, db_name=None, db_port=None):
        # 数据库构造函数,从连接池中取出连接,并生成操作游标
        self.db_host = db_host 
        self.db_user = db_user
        self.db_pass = db_pass
        self.db_name = db_name
        self.db_port = db_port
        self._conn = self.connection()

    def connection(self):
        if self.__pool is None:
            try:
                self.__pool = PooledDB(creator=pymysql, mincached=1, maxcached=50, host=self.db_host, user=self.db_user, passwd=self.db_pass, db=self.db_name, port=int(self.db_port), charset="utf8")
            except Exception:
                logging.error('数据库连接失败:%s' % traceback.format_exc())
        return self.__pool.connection()

    def writeDb(self, sql, data=(), transaction='default'):
        """
        操作数据库:insert update delete
        如果要开启事务,在第三个参数加上以下字符
        开启事务,加上参数 'start'
        中间过程,加上参数 'contine'
        结束事务,加上参数 'end'
        """
        result = None
        if transaction == 'start' or transaction == 'default':
            # 启动数据库连接,如果连接失败,则返回False
            pass

        try:
            self._cursor = self._conn.cursor()
            self._cursor.execute(sql, data)  # 执行SQL
        except Exception:
            self._cursor.close()  # 关闭游标
            self.rollback()  # 操作异常时,操作回滚
            logging.error('Write 数据库操作失败:%s' % traceback.format_exc())
            return False
        else:
            # 获取数据,主要是在需要insert时,获取最新的ID等场景
            try:
                result = self._cursor.lastrowid
            except Exception:
                logging.error('Write 数据库操作失败:%s' % traceback.format_exc())
                self._cursor.close()  # 关闭游标

        # 当不开启事务,或事务结束的时候,提交事务和关闭连接
        if transaction == 'end' or transaction == 'default':
            self.commit()  # 提交事务

        # 如果写入数据后,有返回数据,则把该数据返回给调用者
        if result:
            return result
        return self._cursor.rowcount  # 返回操作成功的记录条数

    def readDb(self, sql, data=()):
        """查询数据库:select"""
        result = False
        try:
            self._cursor = self._conn.cursor()
            self._cursor.execute(sql, data)  # 执行SQL
        except Exception:
            logging.error('Read 数据库操作失败:%s' % traceback.format_exc())
            self._cursor.close()
        else:
            result = [dict((self._cursor.description[i][0], value) for i, value in enumerate(row)) for row in
                      self._cursor.fetchall()]
            self._cursor.close()
            self.commit()
        finally:
            return result

    def rollback(self):
        """回滚操作"""
        try:
            self._conn.rollback()  # 操作异常时,操作回滚
        except Exception:
            pass

    def commit(self):
        """提交事务"""
        try:
            self._conn.commit()
        except Exception:
            pass

    def dispose(self):
        """
        释放连接池资源
        """
        self._conn.close()

SSH登陆交换机批量执行命令一

#!/usr/bin/env python3.7
# coding=utf-8
"""
    python 版本3.7
    SSH登陆华为交换机执行批量命令
"""

import asyncio
import asyncssh
import sys
import re

host = "192.168.1.2"
username = "dairufeng"
password = "blog.linuxyw.com"

cmd = """
    return
    dis version
    dis cur
    #
    dis tcp status
"""


def validate_the_command_return_end(line):
    """
    交换机判断下发命令是否返回结束
    """
    search_str_content = ">|]"
    search_end_content = "##\r\n"
    search_str_result = re.search(search_str_content, line)
    search_end_result = re.search(search_end_content, line)
    return search_str_result and search_end_result


async def run_client():
    async with asyncssh.connect(host=host, username=username, password=password) as conn:
        async with conn.create_process(stderr=asyncssh.STDOUT) as process:
            process.stdin.write('return\n' + cmd + '\n##\n')
            while 1:
                result = await process.stdout.readline()
                # 检测命令是否执行完毕,如果完毕则跳出循环,结束
                if validate_the_command_return_end(result) or not result:
                    print("############  end  ############")
                    break
                # 如果执行的命令中,信息过多,会提示More字符,需要继续发送回车才能继续输出后续的内容
                if result.find('More') > 0:
                    process.stdin.write('\n##\n')
                print(result, end='')

try:
    asyncio.run(run_client())
except (OSError, asyncssh.Error) as exc:
    sys.exit('SSH connection failed: ' + str(exc))

备注

asyncioasyncsshpython3.4+上支持
此段代码运行环境是在python3.7asyncio.run()方法3.7上实现,低于此版本请用get_event_loop().run_until_complete()
批量执行的命令全部写在cmd的"""内,每条命令为一行,代码中的命令只为演示随便写的

SSH登陆交换机批量执行命令二

#!/usr/bin/env python
# coding=utf-8
"""
    python 版本3.7
    SSH登陆华为交换机执行批量命令
"""

import asyncio
import time
import asyncssh
import sys
import re
import traceback
from queue import Queue

# from controller.core import getIni  # 以下几行,是从配置文件读取参数
#
# username = getIni("switch", "username")
# password = getIni("switch", "password")
# port = getIni("switch", "port")
switch_host_process_dic = dict()
session_timeout = 600  # 交换机session有效期时间(秒)


def validate_the_command_return_end(line):
    """
    交换机判断下发命令是否返回结束
    在SSH执行命令时,额外多发送了2个#号,就是此处用来捕获后判断是否命令返回结束
    """
    search_str_content = ">|]"
    search_end_content = "##\r\n"
    search_str_result = re.search(search_str_content, line)
    search_end_result = re.search(search_end_content, line)
    return search_str_result and search_end_result


class MySSHClient(asyncssh.SSHClient):
    """
    回调方法
    """

    def connection_made(self, conn):
        """
        创建连接是回调
        :param conn:
        :return:
        """
        print('SSH connection received from %s.' %
              conn.get_extra_info('peername')[0])

    def connection_lost(self, exc):
        """
        连接失败时回回调
        :param exc:
        :return:
        """
        if exc:
            print('SSH connection error: ' + str(exc), file=sys.stderr)
        else:
            print('SSH connection closed.')

    def data_received(self, data):
        print('received {} bytes'.format(len(data)))

    def eof_received(self):
        print('EOF received')
        self.transport.close()


async def run_client(host: str, cmds_dict: dict, queue: object, retry_num: int = 1) -> bool:
    """
    异步执行SSH,并把执行返回的结果放到队列返回
    :param host: 交换机IP
    :param cmds_dict: 执行的命令列表
    :param queue: 返回执行结果的队列
    :param retry_num: 第几次重试
    :return:
    """
    curr_time = time.time()  # 当前时间
    results = {}  # 存储要返回的结果
    print(cmds_dict)
    try:
        # 复用session,交换机默认session有效期是10分钟,过期失效要重新登陆
        if host in switch_host_process_dic and (curr_time - switch_host_process_dic[host]["time"] < session_timeout):
            print("复用连接:离对象失效还差 %s 秒" % (session_timeout - (curr_time - switch_host_process_dic[host]["time"])))
            process = switch_host_process_dic[host]["process"]
            switch_host_process_dic[host]["time"] = curr_time
        else:
            print(f"创建新的连接,现在是第{retry_num}次连接!")
            if host not in switch_host_process_dic:
                switch_host_process_dic[host] = {"retry": retry_num}

            conn, client = await asyncio.wait_for(
                asyncssh.create_connection(MySSHClient, host=host, port=int(port),
                                           username=username, password=password, known_hosts=None,
                                           client_keys=None), timeout=20)
            process = await conn.create_process(stderr=asyncssh.STDOUT)

            # 存储对象,在session过期前可以复用,免去ssh验陆验证和减少会话数(交换机默认用户数是5个)
            switch_host_process_dic[host]["process"] = process
            switch_host_process_dic[host]["time"] = curr_time
        for host_key, cmds in cmds_dict.items():
            results[host_key] = {}
            for i, cmd in enumerate(cmds):
                if i == 0:
                    # 在执行第一条命令前,先输入去除"More"功能(关闭分屏功能),
                    cmd_content = 'return\n' + "screen-length 0 temporary\n" + cmd
                process.stdin.write(cmd_content + '\n##\n')

                while 1:
                    result = await process.stdout.readline()
                    if validate_the_command_return_end(result) or not result:
                        # 如果检测到数据最后一行,则退出循环
                        break

                    if cmd not in results[host_key]:
                        results[host_key][cmd] = result
                    else:
                        results[host_key][cmd] = results[host_key].get(cmd) + result

                    if result.find('More') > 0:
                        # 如果出现"Mone"字段,则继续发送回车,让后面数据显示出来
                        process.stdin.write('\n##\n')
        queue.put(results)
        return True
    except BrokenPipeError:
        print(f"连接失败,进行第{retry}次重连接")
        switch_host_process_dic[host]["retry"] += 1
        retry = switch_host_process_dic[host]["retry"]

        if retry >= 4:
            del switch_host_process_dic[host]
            print("已进行完3次连接,结束重试")
            return False

        coroutine = run_client(host, cmds_dict, queue, retry)
        asyncio.ensure_future(coroutine)
    except asyncssh.misc.PermissionDenied:
        print("帐号或密码错误,登陆失败!")
        return False
    except Exception:
        print(traceback.format_exc())
        return False


if __name__ == "__main__":
    username = "dairufeng"
    password = "www.linuxyw.com"
    port = 22
    host = "192.168.54.202"
    cmds_dict = {
        '192.168.54.202__1/0/3,1/0/4__1': ['dis ver', 'dis cu'],
    }   # 这数据格式,是根据项目数据结构而定的
    queue = Queue()
    asyncio.run(run_client(host=host,cmds_dict=cmds_dict,queue=queue))
    while not queue.empty():
        print(queue.get())

备注

上面这段代码是从项目中截下来,小做了修改后发上来的,这段代码主要是做为ssh交换交换机使用,对交换机登陆的 session进行了复用(交换机默认是10分钟有效期,失效后要重新默认,且默认最多是5个会话,多了会连接失败,复用的好处是减少会话数, 省去登陆的过程,在并发场景提升效率)。实际项目中,会用并发调用此方法,而且不是像这里简单的使用