SSH(Secure Shell)协议是一种加密的网络传输协议,使得在不安全的网络环境中可以安全的执行远程登录、远程命令执行和数据传输等操作。
SSH 协议的组成可以大致分成三个模块:
传输层协议比较偏向于基础设施类协议,本文中我们所说的定制集中在用户认证协议和连接层协议的定制。
本文中的所有代码基于 Python3 + asyncssh 库进行实现。
在基于 asyncssh 库实现 ssh 服务时,应用程序需要继承并实现 SSHServer 类,
常见的需要重载的方法如:
connection_made
begin_auth
connection_lost
xxxx_supported
validate_xxxx
SSHServer 类主要控制客户端与SSH服务之间的认证行为。
在认证完成后,客户端与服务之间的连接行为,需要通过 SSHServerSession 类来完成,业务需要继承这个类并重载相关方法如:
connection_made
pty_requested
shell_requested
exec_requested
subsystem_requested
session_started
data_received
还有一点需要注意的是,在阅读 asyncssh 的文档以及源码时,发现有部分错误,详情可以参考:关于 SSHServerSession 的 Issue
本文主要侧重于三种比较常见的认证方式:
我们在使用 ssh 命令行时,可以指定认证方式的优先级:
命令行工具使用 -o 参数指定认证方式的优先级
none -- 不需要认证
keyboard-interactive -- 键盘交互式认证
publickey -- 公钥认证
password -- 密码认证
例如:
ssh -o PreferredAuthentications=none,keyboard-interactive, publickey, password user@example.com
用户名密码认证主要有两方面的功能:
import asyncio
import json
import sys
import time
from typing import Any, Dict, Optional
import asyncssh
from asyncssh.misc import MaybeAwait, PasswordChangeRequired, PermissionDenied
class demo_ssh_server(asyncssh.SSHServer):
def __init__(self):
self.AUTH_MAX_RETRIES: int = 2
self.CHANGE_MAX_RETRIES: int = 2
self.password_retried: int = 0
self.update_password_retried: int = 0
self.connected_time = ''
self.auth_method = 'none'
self.auth_username = ''
self.auth_password = ''
self.auth_new_password = ''
self.auth_success: bool = False
def connection_made(self, conn: asyncssh.SSHServerConnection) -> None:
self.connected_time = time.strftime('%Y-%m-%d %H:%M:%S %p %A %B')
def session_requested(self):
return demo_ssh_session(self)
def connection_lost(self, exc: Optional[Exception]) -> None:
if exc:
print('SSH connection error: ' + str(exc), file = sys.stderr)
else:
print('SSH connection closed.')
def begin_auth(self, username: str) -> bool:
# If the user's password is the empty string, no auth is required
print('begin_auth, username:{}'.format(username))
return True # 需要认证
def password_auth_supported(self) -> bool:
# 开启密码认证
return True
def validate_password(self, username: str, password: str) -> MaybeAwait[bool]:
# 标记当前的认证方式为 password
self.auth_method = 'password'
self.auth_username = username
self.auth_password = password
print('username:{}, password:{}, auth password retried:{}, MAX LIMIT:{}'.format(
username,
password,
self.update_password_retried,
self.AUTH_MAX_RETRIES))
# 如果密码重试次数已经超过了最大限度,则返回失败
if self.password_retried > self.AUTH_MAX_RETRIES or self.update_password_retried > self.CHANGE_MAX_RETRIES:
# 告知客户端当前密码登录验证失败,并将失败信息通过 reason 字段传递给客户端
raise PermissionDenied(reason = 'demo ssh server, password auth failed, user:[{}]'.format(username))
# 用户名不为 test 和 change 时,认为校验失败
if username != 'test' and username != 'change':
self.password_retried += 1
return False
# 校验 username 和 password 是否匹配,如果匹配则返回 True
# 当用户名为 test 时,密码必须为 test
if username == 'test' and password != 'test':
self.password_retried += 1
return False
# 当用户名为 change 时,要求客户端更改密码
if username == 'change' and len(password) > 0:
self.password_retried += 1
raise PasswordChangeRequired(prompt = 'change your password for username:[{}]'.format(username))
self.auth_success = True
return True
def change_password(self, username: str, old_password: str,
new_password: str) -> MaybeAwait[bool]:
print('update password retried:{}, MAX LIMIT:{}'.format(self.update_password_retried, self.CHANGE_MAX_RETRIES))
# 如果重置密码的次数超过上限则返回 False
if self.update_password_retried > self.CHANGE_MAX_RETRIES:
# 告知客户端当前密码登录验证失败,并将失败信息通过 reason 字段传递给客户端
raise PermissionDenied(reason = 'demo ssh server, update password failed, user:[{}]'.format(username))
# 如果老的密码和新的密码一致 或 新密码为空,则要求再次改变密码
if old_password == new_password or len(new_password) <= 0:
self.update_password_retried += 1
raise PasswordChangeRequired('retry change password for [{}]'.format(username))
self.auth_new_password = new_password
# 密码认证且更新了密码
self.auth_method = 'password and updated'
self.auth_success = True
return True
def auth_completed(self) -> None:
print(
'demo ssh server, auth_method:{}, '
'auth_username:{}, auth_password:{}, '
'auth_pwd_retried:{}, changed_pwd_retried:{}, '
'new_password:{}, auth_result:{}'.format(
self.auth_method,
self.auth_username,
self.auth_password,
self.password_retried,
self.update_password_retried,
self.auth_new_password,
self.auth_success))
pass
class demo_ssh_session(asyncssh.SSHServerSession):
def __init__(self, server: demo_ssh_server):
self.server: demo_ssh_server = server
def connection_made(self, channel):
super().connection_made(channel)
ret: Dict[str, Any] = {
'auth_method': self.server.auth_method,
'auth_username': self.server.auth_username,
'auth_password': self.server.auth_password,
'password_retried': self.server.password_retried,
'update_password_retried': self.server.update_password_retried,
'auth_new_password': self.server.auth_new_password,
'auth_succeed': self.server.auth_success,
}
data = json.dumps(ret, ensure_ascii = False, sort_keys = True)
# 当密码校验成功后,客户端与服务端建立了 session,此时回写登录信息给客户端
channel.write("{}\n".format(data))
channel.exit(0)
async def start_server() -> None:
"""
create_server 是一个封装了 listen 方法的协程,
它提供与旧版AsyncSSH的向后兼容性。
唯一的区别是,server_factory参数在此调用中是一个位置参数,
而不是关键字参数或通过SSHServerConnectionOptions对象指定,
这与asyncio.AbstractEventLoop.create_server相似。
:return: None
"""
await asyncssh.create_server(
server_factory = demo_ssh_server,
host = '127.0.0.1',
port = 18822,
server_version = 'DEMO-TEST-AsyncSSH_2.13.1', # 自定义服务器 SSH 协议版本名称
server_host_keys = ['/Users/cyx/.ssh/devcloud_ssh/devcloud_id_rsa'],
)
await asyncio.Event().wait() # 添加一个永远不会触发的事件,使服务器保持运行状态
if __name__ == '__main__':
try:
print(asyncssh.__version__)
asyncio.run(start_server())
except (OSError, asyncssh.Error) as exc:
sys.exit('Error starting server: ' + str(exc))
当使用错误的用户名登陆时:
当使用正确的用户名,先输错密码,最后输入正确密码时:
使用需要改密的用户名时:
交互式认证的流程为,先向用户给出提示,获得客户端的输入,最后根据用户的输入判断认证是否成功。这里可以结合双因子认证来做,比如结合短信验证码或小程序 OTP 码等等。
"""
ssh -o PreferredAuthentications=keyboard-interactive user@example.com
"""
import asyncio
import sys
from typing import Optional
import asyncssh
from asyncssh.auth import KbdIntChallenge, KbdIntResponse
from asyncssh.misc import MaybeAwait
class demo_ssh_server(asyncssh.SSHServer):
def __init__(self):
self.first_auth = False
self.second_auth = False
pass
def connection_lost(self, exc: Optional[Exception]) -> None:
if exc:
print('SSH connection error: ' + str(exc), file = sys.stderr)
else:
print('SSH connection closed.')
def begin_auth(self, username: str) -> MaybeAwait[bool]:
"""
:param username: 认证的用户名
:return: 不需要认证则返回 False,需要认证返回 True
"""
return True
def kbdint_auth_supported(self) -> bool:
"""
:return: 是否支持键盘交互式认证
"""
return True
def get_kbdint_challenge(self, username: str, lang: str,
submethods: str) -> MaybeAwait[KbdIntChallenge]:
"""
:param username: 登录的用户名 username@xxxx
:param lang:
:param submethods:
:return: 这个方法应该返回 True,如果认证应该在没有任何挑战的情况下成功;
返回 False,如果认证应该在没有任何挑战的情况下失败;
返回一个认证挑战,包括
( 挑战名称, 说明, 语言标签, [ ( 提示字符串, 布尔值 ) ] ),
当为该提示输入值时,布尔值表示是否应该回显输入。
"""
self.first_auth = True
print('username:{}, lang:{}, submethods:{}'.format(username, lang, submethods))
# 挑战名称、说明、语言标签、[(提示符字符串、是否回显)]
return (' ChallengeName', 'ChallengeDesc', 'Tag',
[('Input something with echo:', True),
('Input something without echo:', False)])
def validate_kbdint_response(self, username: str, responses: KbdIntResponse) -> MaybeAwait[KbdIntChallenge]:
if self.first_auth and not self.second_auth:
print('username:{}, responses:{}'.format(username, responses))
self.second_auth = True
return ('Second ChallengeName', 'Second ChallengeDesc', 'Second Tag',
[('Input something AGAIN with echo:', True),
('Input something AGAIN without echo:', False)])
if self.first_auth and self.second_auth:
print('AGAIN username:{}, responses:{}'.format(username, responses))
return True
return False
async def start_server() -> None:
"""
create_server 是一个封装了 listen 方法的协程,
它提供与旧版AsyncSSH的向后兼容性。
唯一的区别是,server_factory参数在此调用中是一个位置参数,
而不是关键字参数或通过SSHServerConnectionOptions对象指定,
这与asyncio.AbstractEventLoop.create_server相似。
:return: None
"""
await asyncssh.create_server(
server_factory = demo_ssh_server,
host = '127.0.0.1',
port = 18822,
server_version = 'DEMO-TEST-AsyncSSH_2.13.1', # 自定义服务器 SSH 协议版本名称
server_host_keys = ['/Users/cyx/.ssh/devcloud_ssh/devcloud_id_rsa'],
)
await asyncio.Event().wait() # 添加一个永远不会触发的事件,使服务器保持运行状态
if __name__ == '__main__':
try:
print(asyncssh.__version__)
asyncio.run(start_server())
except (OSError, asyncssh.Error) as exc:
sys.exit('Error starting server: ' + str(exc))
公钥认证方式指的是,客户端提前将公钥种到了服务器中(.ssh/authorized_keys),然后客户端在 ssh 登录时,会直接和 ssh 服务器验证公钥的合法性。
当然,ssh 服务器也可以通过解析 ssh 客户端发过来的公钥数据,进行实时的放行或拦截。
import asyncio
import sys
from typing import Optional
import asyncssh
class demo_ssh_server(asyncssh.SSHServer):
def connection_lost(self, exc: Optional[Exception]) -> None:
if exc:
print('SSH connection error: ' + str(exc), file = sys.stderr)
else:
print('SSH connection closed.')
def public_key_auth_supported(self) -> bool:
return True
async def start_server() -> None:
"""
create_server 是一个封装了 listen 方法的协程,
它提供与旧版AsyncSSH的向后兼容性。
唯一的区别是,server_factory参数在此调用中是一个位置参数,
而不是关键字参数或通过SSHServerConnectionOptions对象指定,
这与asyncio.AbstractEventLoop.create_server相似。
:return: None
服务器从名为 authorized_keys 的文件中读取允许的客户端公钥。这个文件应该包含一行或多行 OpenSSH 格式的公钥
当执行:
ssh -i ~/.ssh/id_rsa -o PreferredAuthentications=publickey test@127.0.0.1 -p 18822
会得到:
channel 0: open failed: connect failed: Session refused
当执行:
ssh -i ~/.ssh/devcloud_ssh/devcloud_id_rsa -o PreferredAuthentications=publickey test@127.0.0.1 -p 18822
会得到:
test@127.0.0.1: Permission denied (publickey).
"""
await asyncssh.create_server(
server_factory = demo_ssh_server,
host = '127.0.0.1',
port = 18822,
server_version = 'DEMO-TEST-AsyncSSH_2.13.1', # 自定义服务器 SSH 协议版本名称
server_host_keys = ['/Users/cyx/.ssh/devcloud_ssh/devcloud_id_rsa'],
authorized_client_keys = '/Users/cyx/.ssh/authorized_keys'
)
await asyncio.Event().wait() # 添加一个永远不会触发的事件,使服务器保持运行状态
if __name__ == '__main__':
try:
print(asyncssh.__version__)
asyncio.run(start_server())
except (OSError, asyncssh.Error) as exc:
sys.exit('Error starting server: ' + str(exc))
此时服务器中的公钥文件为:
客户端分别使用合法的公钥以及非法的公钥访问时,会得到:
"""
SSH 指定私钥文件路径:
ssh -i ~/.ssh/id_rsa -o PreferredAuthentications=publickey test@127.0.0.1 -p 18822
在发起 SSH 连接时,会由这个私钥派生出对应的公钥,并使用派生出的公钥进行用户认证
"""
import asyncio
import hashlib
import sys
from typing import List, Optional
import asyncssh
from asyncssh import SSHKey
from asyncssh.misc import MaybeAwait
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from openssh_key_parse import protocol_binary_to_authorized_key_rsa
class demo_ssh_server(asyncssh.SSHServer):
def __init__(self):
self.auth_client_pub_keys: List[bytes] = []
pass
def begin_auth(self, username: str) -> MaybeAwait[bool]:
"""
预先加载好客户认证需要的密钥对
以 "-----BEGIN OPENSSH PRIVATE KEY-----" 开头的密钥实际上是 OpenSSH 格式的私钥,而不是传统的 PEM 格式。
OpenSSH 格式是 OpenSSH 6.5 及更高版本中引入的新格式。
尽管它们看起来很像 PEM 格式,但它们之间有一些差异。
PEM 格式的密钥文件是以文本形式存储的,可以直接用文本编辑器打开。它们以 "-----BEGIN" 开头,
然后是密钥类型(如 RSA PRIVATE KEY、RSA PUBLIC KEY、PUBLIC KEY 等),接着是 "-----"。
文件结尾处以 "-----END" 开头,后面跟着与开始相同的密钥类型和 "-----"
:param username: 登录的用户名
:return: True 表示需要进行认证,False 标识不需要认证
"""
local_key_path = ['/Users/cyx/.ssh/devcloud_ssh/devcloud_id_rsa',
'/Users/cyx/.ssh/id_rsa']
private_key = None
for p in local_key_path:
with open(p, 'rb') as key_file:
key_file_data = key_file.read()
if key_file_data.startswith('-----BEGIN OPENSSH PRIVATE KEY-----'.encode(encoding = 'utf-8')):
private_key = serialization.load_ssh_private_key(
key_file_data,
password = None, # 如果私钥文件有密码,请在此处提供
backend = default_backend())
if key_file_data.startswith('-----BEGIN RSA PRIVATE KEY-----'.encode(encoding = 'utf-8')):
private_key = serialization.load_pem_private_key(
key_file_data,
password = None,
backend = default_backend()
)
"""
将 encoding 参数设置为 serialization.Encoding.PEM,表示希望以 PEM 编码输出公钥。
将 format 参数设置为 serialization.PublicFormat.SubjectPublicKeyInfo,表示希望输出公钥的 SubjectPublicKeyInfo 格式
这是 PEM 格式公钥的标准格式
将 encoding 参数设置为 serialization.Encoding.DER,表示希望以 DER 编码输出公钥。
将 format 参数设置为 serialization.PublicFormat.SubjectPublicKeyInfo,表示希望输出公钥的 SubjectPublicKeyInfo 格式。
这是 DER 格式公钥的标准格式。
SubjectPublicKeyInfo works only with PEM or DER encoding
RSA 公钥转换为 SSH RSA 公钥格式的二进制数据,需要将 Encoding 和 PublicFormat 填充为:OpenSSH
"""
pub_key_data = private_key.public_key().public_bytes(encoding = serialization.Encoding.OpenSSH,
format = serialization.PublicFormat.OpenSSH)
# print(p, pub_key_data)
self.auth_client_pub_keys.append(pub_key_data)
return True
def connection_lost(self, exc: Optional[Exception]) -> None:
if exc:
print('SSH connection error: ' + str(exc), file = sys.stderr)
else:
print('SSH connection closed.')
def password_auth_supported(self) -> bool:
return False
def kbdint_auth_supported(self) -> bool:
return False
def public_key_auth_supported(self) -> bool:
return True
def validate_public_key(self, username: str, key: SSHKey) -> MaybeAwait[bool]:
print('try login, username:{}, pub key:{}'.format(username, hashlib.md5(key.public_data[:-16]).hexdigest()))
for i in self.auth_client_pub_keys:
if i == protocol_binary_to_authorized_key_rsa(key.public_data):
print(username, '==> pub key matched, login success!!!')
return True
print(username, '--> pub key mis-matched')
return False
async def start_server() -> None:
"""
create_server 是一个封装了 listen 方法的协程,
它提供与旧版AsyncSSH的向后兼容性。
唯一的区别是,server_factory参数在此调用中是一个位置参数,
而不是关键字参数或通过SSHServerConnectionOptions对象指定,
这与asyncio.AbstractEventLoop.create_server相似。
:return: None
"""
await asyncssh.create_server(
server_factory = demo_ssh_server,
host = '127.0.0.1',
port = 18822,
server_version = 'DEMO-TEST-AsyncSSH_2.13.1', # 自定义服务器 SSH 协议版本名称
server_host_keys = ['/Users/cyx/.ssh/devcloud_ssh/devcloud_id_rsa'],
)
await asyncio.Event().wait() # 添加一个永远不会触发的事件,使服务器保持运行状态
if __name__ == '__main__':
try:
print(asyncssh.__version__)
asyncio.run(start_server())
except (OSError, asyncssh.Error) as exc:
sys.exit('Error starting server: ' + str(exc))
其中,ssh 服务器在接收到客户端发送过来的公钥时,需要对公钥数据做一些格式的转化。
def protocol_binary_to_authorized_key_rsa(data: bytes) -> bytes:
# 使用 Paramiko 将二进制数据转换为公钥对象
paramiko_key = paramiko.RSAKey(data = data)
# 将公钥对象转换为 OpenSSH 格式的文本公钥
public_key_openssh = paramiko_key.get_base64()
# 拼接密钥类型(如 "ssh-rsa")和 Base64 编码的公钥数据
openssh_public_key = b"ssh-rsa " + public_key_openssh.encode("utf-8")
# print(openssh_public_key)
return openssh_public_key
def authorized_key_to_protocol_binary(data: bytes) -> bytes:
# 从 OpenSSH 格式文本公钥中提取 Base64 编码的部分
key_base64 = data.decode().split(" ", 2)
key_data = base64.b64decode(key_base64[1])
# 使用 Paramiko 将二进制数据转换为公钥对象
paramiko_key = paramiko.RSAKey(data = key_data)
# 获取二进制 SSH RSA 公钥数据
binary_ssh_rsa_public_key = paramiko_key.asbytes()
# print(binary_ssh_rsa_public_key)
return binary_ssh_rsa_public_key
这里的转化效果可以通过以下的例子来测试:
# 二进制格式的 SSH RSA 公钥,其中包含了公钥的类型、长度和实际数据。这种格式在 SSH 协议中用于传输公钥。
ssh_protocol_binary_rsa = b'\x00\x00\x00\x07ssh-rsa\x00\x00\x00\x03\x01\x00\x01\x00\x00\x01\x01\x00\xb4{+(\xf4\xb0\xa6\xc7\xbb\xac\xe8^u\xc2BR\xe4\x89,\\t\xf7t\x87\x91\x1a\xbf#$\x87\xd3MjA+\xb1Z\xee^\xa3\x0f\xcc\t\x05\xb8\xddt\x1ayt\xcdk\x8eZ\xaa\x1a9\x9fzZ\xa2\x05\xe1\x84\t\xbc=\x9d"\x98\x87\xd5\x80C\xd7\x88\x183\x90:\x85\x0e\xafc*\xabI\xb1\xc8\xf0\x9c~\xbe\xc2\xd8\xb1a\x11\xe5\xbeh\x9e\xa8\x9cU\xf9\xbd\x88\x86\x14\xbd\x97\xf9\xb0\xf7n\xb5\xd2Z>#nGs\x8f\x0bL\xc7J\xcd\x1b\xcft\xeaJ\xd9\x98\xcc\x8969\x16\x7f\x10\xbf\x06\x07\x9b\xb6jbUB\x87j8P\x1cc\x11*I\xb8~\x95\xc7\xa1\x0c\xc7\x1fM\x18*[\xabv\x91\txB\r\x1a9\xe8G\x10\'A\xf7\x0c"\xd8g\xe06\xe4\xf6WFvf\xe5\x8ayz8\x06\xf6q\xa5`\x00O\x86\\\x18\xd5v$\xf8\xacz\xfc^S\xa9V8b\xa7\xb8\xa4{\xbd\xf2\xd0I77\xde\x81\x98\xa9\xd6NB\xf9\x13J\x1b\xe5\x08\xfb\xcf\xdb\x19'
# OpenSSH 格式的文本数据,以 "ssh-rsa" 开头,后面跟着 Base64 编码的 RSA 公钥。这种格式通常用于 authorized_keys 文件。
ssh_authorized_keys_rsa = b'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC0eyso9LCmx7us6F51wkJS5IksXHT3dIeRGr8jJIfTTWpBK7Fa7l6jD8wJBbjddBp5dM1rjlqqGjmfelqiBeGECbw9nSKYh9WAQ9eIGDOQOoUOr2Mqq0mxyPCcfr7C2LFhEeW+aJ6onFX5vYiGFL2X+bD3brXSWj4jbkdzjwtMx0rNG8906krZmMyJNjkWfxC/BgebtmpiVUKHajhQHGMRKkm4fpXHoQzHH00YKlurdpEJeEINGjnoRxAnQfcMIthn4Dbk9ldGdmblinl6OAb2caVgAE+GXBjVdiT4rHr8XlOpVjhip7ike73y0Ek3N96BmKnWTkL5E0ob5Qj7z9sZ'
if __name__ == '__main__':
assert (ssh_authorized_keys_rsa == protocol_binary_to_authorized_key_rsa(ssh_protocol_binary_rsa))
assert (ssh_protocol_binary_rsa == authorized_key_to_protocol_binary(ssh_authorized_keys_rsa))
如果程序可以正常执行完成,则说明数据格式转化成功。
在使用 ssh 的过程中,最常见的场景是,用户通过 ssh 客户端登录进 ssh 服务器后,打开一个 shell 交互的连接,然后用户进行 shell 命令的操作,直到用户或者服务器断开连接。
在用户连接层,其实我们可以自定义 ssh 认证完成后的逻辑,比如服务端给客户端返回一个字符 UI 界面,让用户基于字符 UI 界面进行操作。
完成这种动作,需要在 SSH 服务端,将字符 UI 界面的输入流和输出流与 ssh 客户端session 的输入流和输出流进行对应的绑定。
本文基于 asyncssh 和 prompt_toolkit 进行这种效果的 demo 实现。
源码如下:
import asyncio
import platform
import sys
import time
from typing import Dict, List, Optional
import asyncssh
from asyncssh.misc import MaybeAwait
from prompt_toolkit.completion import WordCompleter
from prompt_toolkit.contrib.ssh import PromptToolkitSSHServer, PromptToolkitSSHSession
from prompt_toolkit.lexers import PygmentsLexer
from prompt_toolkit.renderer import print_formatted_text
from prompt_toolkit.shortcuts import print_formatted_text, ProgressBar
from prompt_toolkit.shortcuts.dialogs import input_dialog, yes_no_dialog
from prompt_toolkit.shortcuts.prompt import PromptSession
from pygments.lexers.html import HtmlLexer
def sys_info() -> Dict[str, str]:
return {
'PythonVersion': sys.version,
'ApiVersion': sys.api_version,
'OSPlatform': sys.platform,
'OSProcessor': platform.platform(),
'BytesEndian': sys.byteorder
}
class ssh_funny_tool(PromptToolkitSSHServer):
def __init__(self, interact):
super().__init__(interact)
self.server_info = sys_info()
self.connected_time: str = ''
def connection_made(self, conn: asyncssh.SSHServerConnection) -> None:
self.connected_time = time.strftime('%Y-%m-%d %H:%M:%S %p %A %B')
def connection_lost(self, exc: Optional[Exception]) -> None:
if exc:
print('SSH connection error: ' + str(exc), file = sys.stderr)
else:
print('SSH connection closed.')
def begin_auth(self, username: str) -> MaybeAwait[bool]:
# 不需要认证
print(username, self.connected_time)
return False
animal_completer = WordCompleter(
[
"alligator",
"ant",
"ape",
"bat",
"bear",
"beaver",
"bee",
"bison",
"butterfly",
"cat",
"chicken",
"crocodile",
"dinosaur",
"dog",
"dolphin",
"dove",
"duck",
"eagle",
"elephant",
"fish",
"goat",
"gorilla",
"kangaroo",
"leopard",
"lion",
"mouse",
"rabbit",
"rat",
"snake",
"spider",
"turkey",
"turtle",
],
ignore_case = True,
)
async def interact(ssh_session: PromptToolkitSSHSession) -> None:
"""
The application interaction.
This will run automatically in a prompt_toolkit AppSession, which means
that any prompt_toolkit application (dialogs, prompts, etc...) will use the
SSH channel for input and output.
"""
prompt_session = PromptSession()
# Alias 'print_formatted_text', so that 'print' calls go to the SSH client.
print = print_formatted_text
print("We will be running a few prompt_toolkit applications through this ")
print("SSH connection.\n")
# Simple progress bar.
with ProgressBar() as pb:
for i in pb(range(50)):
await asyncio.sleep(0.1)
# Normal prompt.
text = await prompt_session.prompt_async("(normal prompt) Type something: ")
print("You typed", text)
# Prompt with auto completion.
text = await prompt_session.prompt_async(
"(autocompletion) Type an animal: ", completer = animal_completer
)
print("You typed", text)
# prompt with syntax highlighting.
text = await prompt_session.prompt_async(
"(HTML syntax highlighting) Type something: ", lexer = PygmentsLexer(HtmlLexer)
)
print("You typed", text)
# Show yes/no dialog.
await prompt_session.prompt_async("Showing yes/no dialog... [ENTER]")
user_choice = await yes_no_dialog("Yes/no dialog", "Running over asyncssh").run_async()
print('user choice:{}'.format(user_choice))
# Show input dialog
await prompt_session.prompt_async("Showing input dialog... [ENTER]")
user_input = await input_dialog("Input dialog", "Running over asyncssh").run_async()
print('user input:{}'.format(user_input))
async def start_server() -> None:
"""
create_server 是一个封装了 listen 方法的协程,
它提供与旧版AsyncSSH的向后兼容性。
唯一的区别是,server_factory参数在此调用中是一个位置参数,
而不是关键字参数或通过SSHServerConnectionOptions对象指定,
这与asyncio.AbstractEventLoop.create_server相似。
:return: None
"""
await asyncssh.create_server(
server_factory = lambda: ssh_funny_tool(interact),
host = '127.0.0.1',
port = 18822,
server_version = 'DEMO-TEST-AsyncSSH_2.13.1', # 自定义服务器 SSH 协议版本名称
server_host_keys = ['/Users/cyx/.ssh/devcloud_ssh/devcloud_id_rsa'],
)
await asyncio.Event().wait() # 添加一个永远不会触发的事件,使服务器保持运行状态
if __name__ == '__main__':
try:
print(asyncssh.__version__)
asyncio.run(start_server())
except (OSError, asyncssh.Error) as exc:
sys.exit('Error starting server: ' + str(exc))
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。