博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Python paramiko 修改源码实现用户命令抓取
阅读量:4345 次
发布时间:2019-06-07

本文共 8877 字,大约阅读时间需要 29 分钟。

paramiko 源码修改

  paramiko主要用来实现ssh客户端、服务端链接,上一节我们说到了堡垒机,堡垒机内有一个需求是“用户行为审计”,在这里我们就可以通过修改paramiko内文件的源码来实现相关要求。

 

paramiko 源码包安装测试
  • 下载地址: 下载源码包paramiko
  • 解压源码包 paramiko-master.zip
  • 使用目录:paramiko-master\paramiko-master\demos
  • python3启动:python paramiko-master\paramiko-master\demos\demos.py

:paramiko demos.py启动 不支持python3 ,由于python3默认是bays类型,需要修改源码。

\demos\interactive.py", line 84, in writeall    sys.stdout.write(data)TypeError: write() argument must be str, not bytes

 

修改源代码(demos.py 添加 python3 支持)
:interactive.py文件内(77~85 行)
def writeall(sock):    while True:        data = sock.recv(256)        if not data:            sys.stdout.write('\r\n*** EOF ***\r\n\r\n')            sys.stdout.flush()            break      # 修改为编译为UTF-8        sys.stdout.write(data.decode())        sys.stdout.flush()
C:\Users\Administrator\demos>python3 demo.pyHostname: 192.168.1.100*** Unable to open host keys file*** WARNING: Unknown host key!Username [Administrator]: rootAuth by (p)assword, (r)sa key, or (d)ss key? [p] pPassword for root@192.168.1.100:*** Here we go!Line-buffered terminal emulation. Press F6 or ^Z to send EOF.Last login: Mon Jan 22 11:24:35 2018 from 192.168.1.150[root@localhost ~]# 注:登陆成功
测试使用

 

修改源码(实现抓取用户操作指令)

注:paramiko-master\paramiko-master\demos\demos.py 文件源码、简要说明
import base64from binascii import hexlifyimport getpassimport osimport selectimport socketimport sysimport timeimport tracebackfrom paramiko.py3compat import inputimport paramikotry:    import interactiveexcept ImportError:    from . import interactivedef agent_auth(transport, username):    """    Attempt to authenticate to the given transport using any of the private    keys available from an SSH agent.    """        agent = paramiko.Agent()    agent_keys = agent.get_keys()    if len(agent_keys) == 0:        return            for key in agent_keys:        # 尝试将本地key验证,验证不了就用密码        print('Trying ssh-agent key %s' % hexlify(key.get_fingerprint()))        try:            transport.auth_publickey(username, key)            print('... success!')            return        except paramiko.SSHException:            print('... nope.')def manual_auth(username, hostname):    default_auth = 'p'    auth = input('Auth by (p)assword, (r)sa key, or (d)ss key? [%s] ' % default_auth)    if len(auth) == 0:        auth = default_auth    # r非对称秘钥    if auth == 'r':        default_path = os.path.join(os.environ['HOME'], '.ssh', 'id_rsa')        path = input('RSA key [%s]: ' % default_path)        if len(path) == 0:            path = default_path        try:            key = paramiko.RSAKey.from_private_key_file(path)        except paramiko.PasswordRequiredException:            password = getpass.getpass('RSA key password: ')            key = paramiko.RSAKey.from_private_key_file(path, password)        t.auth_publickey(username, key)    # 使用DSA 秘钥    elif auth == 'd':        default_path = os.path.join(os.environ['HOME'], '.ssh', 'id_dsa')        path = input('DSS key [%s]: ' % default_path)        if len(path) == 0:            path = default_path        try:            key = paramiko.DSSKey.from_private_key_file(path)        except paramiko.PasswordRequiredException:            password = getpass.getpass('DSS key password: ')            key = paramiko.DSSKey.from_private_key_file(path, password)        t.auth_publickey(username, key)    else:        pw = getpass.getpass('Password for %s@%s: ' % (username, hostname))        t.auth_password(username, pw)# setup loggingparamiko.util.log_to_file('demo.log')username = ''if len(sys.argv) > 1:    hostname = sys.argv[1]    if hostname.find('@') >= 0:        username, hostname = hostname.split('@')else:    # 输入IP    hostname = input('Hostname: ')if len(hostname) == 0:    # 不输入打印错误    print('*** Hostname required.')    sys.exit(1)# 端口号22port = 22if hostname.find(':') >= 0:    hostname, portstr = hostname.split(':')    port = int(portstr)# now connecttry:    # 创建socket对象    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)    # 链接IP端口号    sock.connect((hostname, port))except Exception as e:    # 登陆错误就报错    print('*** Connect failed: ' + str(e))    # 打印错误行    traceback.print_exc()    sys.exit(1)try:    # ssh实例化传入socket    t = paramiko.Transport(sock)    try:        t.start_client()    except paramiko.SSHException:        print('*** SSH negotiation failed.')        sys.exit(1)    try:        keys = paramiko.util.load_host_keys(os.path.expanduser('~/.ssh/known_hosts'))    except IOError:        try:            # 生成rsa 防止证书校验            keys = paramiko.util.load_host_keys(os.path.expanduser('~/ssh/known_hosts'))        except IOError:            print('*** Unable to open host keys file')            keys = {}    # check server's host key -- this is important.    key = t.get_remote_server_key()    if hostname not in keys:        print('*** WARNING: Unknown host key!')    elif key.get_name() not in keys[hostname]:        print('*** WARNING: Unknown host key!')    elif keys[hostname][key.get_name()] != key:        print('*** WARNING: Host key has changed!!!')        sys.exit(1)    else:        print('*** Host key OK.')    # get username    if username == '':        # getuser取出当前用户名        default_username = getpass.getuser()        username = input('Username [%s]: ' % default_username)        if len(username) == 0:            username = default_username    agent_auth(t, username)    # key验证 没通过继续往下走    if not t.is_authenticated():        manual_auth(username, hostname)    if not t.is_authenticated():        print('*** Authentication failed. :(')        # 认证成功就关闭        t.close()        sys.exit(1)    # ssh_session交互    chan = t.open_session()    # get终端  vt100 终端模式    chan.get_pty()    chan.invoke_shell()    print('*** Here we go!\n')    # 交互    # chan是双方打开的一个通道 # 调用interactive文件    interactive.interactive_shell(chan)    chan.close()    t.close()# 表村错误数据关闭链接except Exception as e:    print('*** Caught exception: ' + str(e.__class__) + ': ' + str(e))    traceback.print_exc()    try:        t.close()    except:        pass    sys.exit(1)
文件源码

 

注:paramiko-master\paramiko-master\demos\interactive.py 文件源码修改,实现用户审计

import socketimport sysfrom paramiko.py3compat import u# windows does not have termios...try:    # 不出错代表这是一个linux系统    import termios    import tty    has_termios = Trueexcept ImportError:    has_termios = Falsedef interactive_shell(chan):    if has_termios:        # linux链接        posix_shell(chan)    else:        windows_shell(chan)# chan是传入的通道def posix_shell(chan):    import select        oldtty = termios.tcgetattr(sys.stdin)    try:        tty.setraw(sys.stdin.fileno())        tty.setcbreak(sys.stdin.fileno())        chan.settimeout(0.0)#-----------------1.源码修改---------------------# # 创建列表用来存放客户端输入命令cmd = [] #----------------------------------------------#        while True:            # stdin标准输入,stdout标准输出,标准错误            r, w, e = select.select([chan, sys.stdin], [], [])            # 如果chan在r里面就代表有数据过来了            if chan in r:                try:                    # 数据过来就收数据                    x = u(chan.recv(1024))                    # 如果数据等于0,代表断开链接                    if len(x) == 0:                        sys.stdout.write('\r\n*** EOF\r\n')                        break                    # 发送数据                    sys.stdout.write(x)                    # 刷新数据                    sys.stdout.flush()                except socket.timeout:                    pass            # 没进行一次动作就select就执行返回一次            if sys.stdin in r:                x = sys.stdin.read(1)                if len(x) == 0:                    break #-----------------2.源码修改---------------------#             # 判断用户是否使用回车键                if x == "\r":                    # 将列表字符拼接                    cmd_str="".join(cmd)                    # 获取出用户执行命令                    print("-->",cmd_str)                    # 清空列表                    cmd = []                else:                    # 没有回车就将字符添加列表内                    cmd.append(x) #----------------------------------------------#                chan.send(x)    finally:        termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty)    # thanks to Mike Looijmans for this codedef windows_shell(chan):    import threading    sys.stdout.write("Line-buffered terminal emulation. Press F6 or ^Z to send EOF.\r\n\r\n")            def writeall(sock):        while True:            data = sock.recv(256)            if not data:                sys.stdout.write('\r\n*** EOF ***\r\n\r\n')                sys.stdout.flush()                break            sys.stdout.write(data.decode())            sys.stdout.flush()            writer = threading.Thread(target=writeall, args=(chan,))    writer.start()            try:        while True:            d = sys.stdin.read(1)            if not d:                break            chan.send(d)    except EOFError:        # user hit ^Z or F6        pass

 

转载于:https://www.cnblogs.com/xiangsikai/p/8337344.html

你可能感兴趣的文章
读了曾国藩家书,,心态逐渐平和起来。搞技术的如果缺乏信念的指引,生活会很乏味无聊!...
查看>>
前端javascript 错误 Uncaught SyntaxError: Unexpected token ILLEGAL
查看>>
Selenium WebDriver问题--无法打开Chrome浏览器
查看>>
2017.4.18 Java的Integer与int互转
查看>>
小程序接受返回数组的坑
查看>>
Arduino---HC-05 蓝牙模块
查看>>
构建之法读书笔记02——个人技术和流程
查看>>
解决VS2015安装Android SDK 后文件不全及更新问题
查看>>
辣鸡咯..
查看>>
(2018干货系列一)最新Java学习路线整合
查看>>
django 快速搭建blog
查看>>
Chrome插件:本地程序实现验证码破解(浏览器与本地进程通信)
查看>>
学习的态度!
查看>>
小组成员名单()
查看>>
[Javascirpt] What’s new in JavaScript (Google I/O ’19)
查看>>
[Angular 2] Writing a Simple Angular 2 Component
查看>>
可能会用的到的JQ插件
查看>>
高斯消元模板
查看>>
【GPS】SAP测试GPS模块拿不到sensor数据
查看>>
python 数据类型之列表、元组、字典、集合
查看>>