paramiko 源码修改
paramiko主要用来实现ssh客户端、服务端链接,上一节我们说到了堡垒机,堡垒机内有一个需求是“用户行为审计”,在这里我们就可以通过修改paramiko内文件的源码来实现相关要求。
paramiko 源码包安装测试
- 下载地址: 下载源码包paramiko
- 解压源码包 paramiko-master.zip
- 使用目录:paramiko-master\paramiko-master\demos
- python3启动:python paramiko-master\paramiko-master\demos\demos.py
注:paramiko demos.py启动 不支持python3 ,由于python3默认是bays类型,需要修改源码。
\demos\interactive.py", line 84, in writeall sys.stdout.write(data)TypeError: write() argument must be str, not bytes
修改源代码(demos.py 添加 python3 支持)
注:interactive.py文件内(77~85 行)
def writeall(sock): while True: data = sock.recv(256) if not data: sys.stdout.write('\r\n*** EOF ***\r\n\r\n') sys.stdout.flush() break # 修改为编译为UTF-8 sys.stdout.write(data.decode()) sys.stdout.flush()
C:\Users\Administrator\demos>python3 demo.pyHostname: 192.168.1.100*** Unable to open host keys file*** WARNING: Unknown host key!Username [Administrator]: rootAuth by (p)assword, (r)sa key, or (d)ss key? [p] pPassword for root@192.168.1.100:*** Here we go!Line-buffered terminal emulation. Press F6 or ^Z to send EOF.Last login: Mon Jan 22 11:24:35 2018 from 192.168.1.150[root@localhost ~]# 注:登陆成功
修改源码(实现抓取用户操作指令)
注:paramiko-master\paramiko-master\demos\demos.py 文件源码、简要说明
import base64from binascii import hexlifyimport getpassimport osimport selectimport socketimport sysimport timeimport tracebackfrom paramiko.py3compat import inputimport paramikotry: import interactiveexcept ImportError: from . import interactivedef agent_auth(transport, username): """ Attempt to authenticate to the given transport using any of the private keys available from an SSH agent. """ agent = paramiko.Agent() agent_keys = agent.get_keys() if len(agent_keys) == 0: return for key in agent_keys: # 尝试将本地key验证,验证不了就用密码 print('Trying ssh-agent key %s' % hexlify(key.get_fingerprint())) try: transport.auth_publickey(username, key) print('... success!') return except paramiko.SSHException: print('... nope.')def manual_auth(username, hostname): default_auth = 'p' auth = input('Auth by (p)assword, (r)sa key, or (d)ss key? [%s] ' % default_auth) if len(auth) == 0: auth = default_auth # r非对称秘钥 if auth == 'r': default_path = os.path.join(os.environ['HOME'], '.ssh', 'id_rsa') path = input('RSA key [%s]: ' % default_path) if len(path) == 0: path = default_path try: key = paramiko.RSAKey.from_private_key_file(path) except paramiko.PasswordRequiredException: password = getpass.getpass('RSA key password: ') key = paramiko.RSAKey.from_private_key_file(path, password) t.auth_publickey(username, key) # 使用DSA 秘钥 elif auth == 'd': default_path = os.path.join(os.environ['HOME'], '.ssh', 'id_dsa') path = input('DSS key [%s]: ' % default_path) if len(path) == 0: path = default_path try: key = paramiko.DSSKey.from_private_key_file(path) except paramiko.PasswordRequiredException: password = getpass.getpass('DSS key password: ') key = paramiko.DSSKey.from_private_key_file(path, password) t.auth_publickey(username, key) else: pw = getpass.getpass('Password for %s@%s: ' % (username, hostname)) t.auth_password(username, pw)# setup loggingparamiko.util.log_to_file('demo.log')username = ''if len(sys.argv) > 1: hostname = sys.argv[1] if hostname.find('@') >= 0: username, hostname = hostname.split('@')else: # 输入IP hostname = input('Hostname: ')if len(hostname) == 0: # 不输入打印错误 print('*** Hostname required.') sys.exit(1)# 端口号22port = 22if hostname.find(':') >= 0: hostname, portstr = hostname.split(':') port = int(portstr)# now connecttry: # 创建socket对象 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 链接IP端口号 sock.connect((hostname, port))except Exception as e: # 登陆错误就报错 print('*** Connect failed: ' + str(e)) # 打印错误行 traceback.print_exc() sys.exit(1)try: # ssh实例化传入socket t = paramiko.Transport(sock) try: t.start_client() except paramiko.SSHException: print('*** SSH negotiation failed.') sys.exit(1) try: keys = paramiko.util.load_host_keys(os.path.expanduser('~/.ssh/known_hosts')) except IOError: try: # 生成rsa 防止证书校验 keys = paramiko.util.load_host_keys(os.path.expanduser('~/ssh/known_hosts')) except IOError: print('*** Unable to open host keys file') keys = {} # check server's host key -- this is important. key = t.get_remote_server_key() if hostname not in keys: print('*** WARNING: Unknown host key!') elif key.get_name() not in keys[hostname]: print('*** WARNING: Unknown host key!') elif keys[hostname][key.get_name()] != key: print('*** WARNING: Host key has changed!!!') sys.exit(1) else: print('*** Host key OK.') # get username if username == '': # getuser取出当前用户名 default_username = getpass.getuser() username = input('Username [%s]: ' % default_username) if len(username) == 0: username = default_username agent_auth(t, username) # key验证 没通过继续往下走 if not t.is_authenticated(): manual_auth(username, hostname) if not t.is_authenticated(): print('*** Authentication failed. :(') # 认证成功就关闭 t.close() sys.exit(1) # ssh_session交互 chan = t.open_session() # get终端 vt100 终端模式 chan.get_pty() chan.invoke_shell() print('*** Here we go!\n') # 交互 # chan是双方打开的一个通道 # 调用interactive文件 interactive.interactive_shell(chan) chan.close() t.close()# 表村错误数据关闭链接except Exception as e: print('*** Caught exception: ' + str(e.__class__) + ': ' + str(e)) traceback.print_exc() try: t.close() except: pass sys.exit(1)
注:paramiko-master\paramiko-master\demos\interactive.py 文件源码修改,实现用户审计
import socketimport sysfrom paramiko.py3compat import u# windows does not have termios...try: # 不出错代表这是一个linux系统 import termios import tty has_termios = Trueexcept ImportError: has_termios = Falsedef interactive_shell(chan): if has_termios: # linux链接 posix_shell(chan) else: windows_shell(chan)# chan是传入的通道def posix_shell(chan): import select oldtty = termios.tcgetattr(sys.stdin) try: tty.setraw(sys.stdin.fileno()) tty.setcbreak(sys.stdin.fileno()) chan.settimeout(0.0)#-----------------1.源码修改---------------------# # 创建列表用来存放客户端输入命令cmd = [] #----------------------------------------------# while True: # stdin标准输入,stdout标准输出,标准错误 r, w, e = select.select([chan, sys.stdin], [], []) # 如果chan在r里面就代表有数据过来了 if chan in r: try: # 数据过来就收数据 x = u(chan.recv(1024)) # 如果数据等于0,代表断开链接 if len(x) == 0: sys.stdout.write('\r\n*** EOF\r\n') break # 发送数据 sys.stdout.write(x) # 刷新数据 sys.stdout.flush() except socket.timeout: pass # 没进行一次动作就select就执行返回一次 if sys.stdin in r: x = sys.stdin.read(1) if len(x) == 0: break #-----------------2.源码修改---------------------# # 判断用户是否使用回车键 if x == "\r": # 将列表字符拼接 cmd_str="".join(cmd) # 获取出用户执行命令 print("-->",cmd_str) # 清空列表 cmd = [] else: # 没有回车就将字符添加列表内 cmd.append(x) #----------------------------------------------# chan.send(x) finally: termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty) # thanks to Mike Looijmans for this codedef windows_shell(chan): import threading sys.stdout.write("Line-buffered terminal emulation. Press F6 or ^Z to send EOF.\r\n\r\n") def writeall(sock): while True: data = sock.recv(256) if not data: sys.stdout.write('\r\n*** EOF ***\r\n\r\n') sys.stdout.flush() break sys.stdout.write(data.decode()) sys.stdout.flush() writer = threading.Thread(target=writeall, args=(chan,)) writer.start() try: while True: d = sys.stdin.read(1) if not d: break chan.send(d) except EOFError: # user hit ^Z or F6 pass