Source code for ansible.module_utils.shell

#
# (c) 2015 Peter Sprygada, <psprygada@ansible.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible.  If not, see <http://www.gnu.org/licenses/>.
#
import re
import socket

# py2 vs py3; replace with six via ziploader
try:
    from StringIO import StringIO
except ImportError:
    from io import StringIO

try:
    import paramiko
    from paramiko.ssh_exception import AuthenticationException
    HAS_PARAMIKO = True
except ImportError:
    HAS_PARAMIKO = False

from ansible.module_utils.basic import get_exception

ANSI_RE = re.compile(r'(\x1b\[\?1h\x1b=)')

CLI_PROMPTS_RE = [
    re.compile(r'[\r\n]?[a-zA-Z]{1}[a-zA-Z0-9-]*[>|#|%](?:\s*)$'),
    re.compile(r'[\r\n]?[a-zA-Z]{1}[a-zA-Z0-9-]*\(.+\)#(?:\s*)$')
]

CLI_ERRORS_RE = [
    re.compile(r"% ?Error"),
    re.compile(r"^% \w+", re.M),
    re.compile(r"% ?Bad secret"),
    re.compile(r"invalid input", re.I),
    re.compile(r"(?:incomplete|ambiguous) command", re.I),
    re.compile(r"connection timed out", re.I),
    re.compile(r"[^\r\n]+ not found", re.I),
    re.compile(r"'[^']' +returned error code: ?\d+"),
    re.compile(r"syntax error"),
    re.compile(r"unknown command")
]

[docs]def to_list(val): if isinstance(val, (list, tuple)): return list(val) elif val is not None: return [val] else: return list()
[docs]class ShellError(Exception): def __init__(self, msg, command=None): super(ShellError, self).__init__(msg) self.message = msg self.command = command
[docs]class Command(object): def __init__(self, command, prompt=None, response=None): self.command = command self.prompt = prompt self.response = response def __str__(self): return self.command
[docs]class Shell(object): def __init__(self, prompts_re=None, errors_re=None, kickstart=True): self.ssh = None self.shell = None self.kickstart = kickstart self._matched_prompt = None self.prompts = prompts_re or CLI_PROMPTS_RE self.errors = errors_re or CLI_ERRORS_RE
[docs] def open(self, host, port=22, username=None, password=None, timeout=10, key_filename=None, pkey=None, look_for_keys=None, allow_agent=False): self.ssh = paramiko.SSHClient() self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # unless explicitly set, disable look for keys if a password is # present. this changes the default search order paramiko implements if not look_for_keys: look_for_keys = password is None try: self.ssh.connect(host, port=port, username=username, password=password, timeout=timeout, look_for_keys=look_for_keys, pkey=pkey, key_filename=key_filename, allow_agent=allow_agent) self.shell = self.ssh.invoke_shell() self.shell.settimeout(timeout) except socket.gaierror: raise ShellError("unable to resolve host name") except AuthenticationException: raise ShellError('Unable to authenticate to remote device') if self.kickstart: self.shell.sendall("\n") self.receive()
[docs] def strip(self, data): return ANSI_RE.sub('', data)
[docs] def receive(self, cmd=None): recv = StringIO() while True: data = self.shell.recv(200) recv.write(data) recv.seek(recv.tell() - 200) window = self.strip(recv.read()) if isinstance(cmd, Command): self.handle_input(window, prompt=cmd.prompt, response=cmd.response) try: if self.read(window): resp = self.strip(recv.getvalue()) return self.sanitize(cmd, resp) except ShellError: exc = get_exception() exc.command = cmd raise
[docs] def send(self, commands): responses = list() try: for command in to_list(commands): cmd = '%s\r' % str(command) self.shell.sendall(cmd) responses.append(self.receive(command)) except socket.timeout: raise ShellError("timeout trying to send command", cmd) return responses
[docs] def close(self): self.shell.close()
[docs] def handle_input(self, resp, prompt, response): if not prompt or not response: return prompt = to_list(prompt) response = to_list(response) for pr, ans in zip(prompt, response): match = pr.search(resp) if match: cmd = '%s\r' % ans self.shell.sendall(cmd)
[docs] def sanitize(self, cmd, resp): cleaned = [] for line in resp.splitlines(): if line.startswith(str(cmd)) or self.read(line): continue cleaned.append(line) return "\n".join(cleaned)
[docs] def read(self, response): for regex in self.errors: if regex.search(response): raise ShellError('matched error in response: %s' % response) for regex in self.prompts: match = regex.search(response) if match: self._matched_prompt = match.group() return True