391 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
			
		
		
	
	
			391 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
#!/usr/bin/python
 | 
						|
#
 | 
						|
# Low-level QEMU shell on top of QMP.
 | 
						|
#
 | 
						|
# Copyright (C) 2009, 2010 Red Hat Inc.
 | 
						|
#
 | 
						|
# Authors:
 | 
						|
#  Luiz Capitulino <lcapitulino@redhat.com>
 | 
						|
#
 | 
						|
# This work is licensed under the terms of the GNU GPL, version 2.  See
 | 
						|
# the COPYING file in the top-level directory.
 | 
						|
#
 | 
						|
# Usage:
 | 
						|
#
 | 
						|
# Start QEMU with:
 | 
						|
#
 | 
						|
# # qemu [...] -qmp unix:./qmp-sock,server
 | 
						|
#
 | 
						|
# Run the shell:
 | 
						|
#
 | 
						|
# $ qmp-shell ./qmp-sock
 | 
						|
#
 | 
						|
# Commands have the following format:
 | 
						|
#
 | 
						|
#    < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
 | 
						|
#
 | 
						|
# For example:
 | 
						|
#
 | 
						|
# (QEMU) device_add driver=e1000 id=net1
 | 
						|
# {u'return': {}}
 | 
						|
# (QEMU)
 | 
						|
 | 
						|
import qmp
 | 
						|
import json
 | 
						|
import ast
 | 
						|
import readline
 | 
						|
import sys
 | 
						|
import pprint
 | 
						|
 | 
						|
class QMPCompleter(list):
 | 
						|
    def complete(self, text, state):
 | 
						|
        for cmd in self:
 | 
						|
            if cmd.startswith(text):
 | 
						|
                if not state:
 | 
						|
                    return cmd
 | 
						|
                else:
 | 
						|
                    state -= 1
 | 
						|
 | 
						|
class QMPShellError(Exception):
 | 
						|
    pass
 | 
						|
 | 
						|
class QMPShellBadPort(QMPShellError):
 | 
						|
    pass
 | 
						|
 | 
						|
class FuzzyJSON(ast.NodeTransformer):
 | 
						|
    '''This extension of ast.NodeTransformer filters literal "true/false/null"
 | 
						|
    values in an AST and replaces them by proper "True/False/None" values that
 | 
						|
    Python can properly evaluate.'''
 | 
						|
    def visit_Name(self, node):
 | 
						|
        if node.id == 'true':
 | 
						|
            node.id = 'True'
 | 
						|
        if node.id == 'false':
 | 
						|
            node.id = 'False'
 | 
						|
        if node.id == 'null':
 | 
						|
            node.id = 'None'
 | 
						|
        return node
 | 
						|
 | 
						|
# TODO: QMPShell's interface is a bit ugly (eg. _fill_completion() and
 | 
						|
#       _execute_cmd()). Let's design a better one.
 | 
						|
class QMPShell(qmp.QEMUMonitorProtocol):
 | 
						|
    def __init__(self, address, pp=None):
 | 
						|
        qmp.QEMUMonitorProtocol.__init__(self, self.__get_address(address))
 | 
						|
        self._greeting = None
 | 
						|
        self._completer = None
 | 
						|
        self._pp = pp
 | 
						|
        self._transmode = False
 | 
						|
        self._actions = list()
 | 
						|
 | 
						|
    def __get_address(self, arg):
 | 
						|
        """
 | 
						|
        Figure out if the argument is in the port:host form, if it's not it's
 | 
						|
        probably a file path.
 | 
						|
        """
 | 
						|
        addr = arg.split(':')
 | 
						|
        if len(addr) == 2:
 | 
						|
            try:
 | 
						|
                port = int(addr[1])
 | 
						|
            except ValueError:
 | 
						|
                raise QMPShellBadPort
 | 
						|
            return ( addr[0], port )
 | 
						|
        # socket path
 | 
						|
        return arg
 | 
						|
 | 
						|
    def _fill_completion(self):
 | 
						|
        for cmd in self.cmd('query-commands')['return']:
 | 
						|
            self._completer.append(cmd['name'])
 | 
						|
 | 
						|
    def __completer_setup(self):
 | 
						|
        self._completer = QMPCompleter()
 | 
						|
        self._fill_completion()
 | 
						|
        readline.set_completer(self._completer.complete)
 | 
						|
        readline.parse_and_bind("tab: complete")
 | 
						|
        # XXX: default delimiters conflict with some command names (eg. query-),
 | 
						|
        # clearing everything as it doesn't seem to matter
 | 
						|
        readline.set_completer_delims('')
 | 
						|
 | 
						|
    def __parse_value(self, val):
 | 
						|
        try:
 | 
						|
            return int(val)
 | 
						|
        except ValueError:
 | 
						|
            pass
 | 
						|
 | 
						|
        if val.lower() == 'true':
 | 
						|
            return True
 | 
						|
        if val.lower() == 'false':
 | 
						|
            return False
 | 
						|
        if val.startswith(('{', '[')):
 | 
						|
            # Try first as pure JSON:
 | 
						|
            try:
 | 
						|
                return json.loads(val)
 | 
						|
            except ValueError:
 | 
						|
                pass
 | 
						|
            # Try once again as FuzzyJSON:
 | 
						|
            try:
 | 
						|
                st = ast.parse(val, mode='eval')
 | 
						|
                return ast.literal_eval(FuzzyJSON().visit(st))
 | 
						|
            except SyntaxError:
 | 
						|
                pass
 | 
						|
            except ValueError:
 | 
						|
                pass
 | 
						|
        return val
 | 
						|
 | 
						|
    def __cli_expr(self, tokens, parent):
 | 
						|
        for arg in tokens:
 | 
						|
            (key, _, val) = arg.partition('=')
 | 
						|
            if not val:
 | 
						|
                raise QMPShellError("Expected a key=value pair, got '%s'" % arg)
 | 
						|
 | 
						|
            value = self.__parse_value(val)
 | 
						|
            optpath = key.split('.')
 | 
						|
            curpath = []
 | 
						|
            for p in optpath[:-1]:
 | 
						|
                curpath.append(p)
 | 
						|
                d = parent.get(p, {})
 | 
						|
                if type(d) is not dict:
 | 
						|
                    raise QMPShellError('Cannot use "%s" as both leaf and non-leaf key' % '.'.join(curpath))
 | 
						|
                parent[p] = d
 | 
						|
                parent = d
 | 
						|
            if optpath[-1] in parent:
 | 
						|
                if type(parent[optpath[-1]]) is dict:
 | 
						|
                    raise QMPShellError('Cannot use "%s" as both leaf and non-leaf key' % '.'.join(curpath))
 | 
						|
                else:
 | 
						|
                    raise QMPShellError('Cannot set "%s" multiple times' % key)
 | 
						|
            parent[optpath[-1]] = value
 | 
						|
 | 
						|
    def __build_cmd(self, cmdline):
 | 
						|
        """
 | 
						|
        Build a QMP input object from a user provided command-line in the
 | 
						|
        following format:
 | 
						|
 | 
						|
            < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
 | 
						|
        """
 | 
						|
        cmdargs = cmdline.split()
 | 
						|
 | 
						|
        # Transactional CLI entry/exit:
 | 
						|
        if cmdargs[0] == 'transaction(':
 | 
						|
            self._transmode = True
 | 
						|
            cmdargs.pop(0)
 | 
						|
        elif cmdargs[0] == ')' and self._transmode:
 | 
						|
            self._transmode = False
 | 
						|
            if len(cmdargs) > 1:
 | 
						|
                raise QMPShellError("Unexpected input after close of Transaction sub-shell")
 | 
						|
            qmpcmd = { 'execute': 'transaction',
 | 
						|
                       'arguments': { 'actions': self._actions } }
 | 
						|
            self._actions = list()
 | 
						|
            return qmpcmd
 | 
						|
 | 
						|
        # Nothing to process?
 | 
						|
        if not cmdargs:
 | 
						|
            return None
 | 
						|
 | 
						|
        # Parse and then cache this Transactional Action
 | 
						|
        if self._transmode:
 | 
						|
            finalize = False
 | 
						|
            action = { 'type': cmdargs[0], 'data': {} }
 | 
						|
            if cmdargs[-1] == ')':
 | 
						|
                cmdargs.pop(-1)
 | 
						|
                finalize = True
 | 
						|
            self.__cli_expr(cmdargs[1:], action['data'])
 | 
						|
            self._actions.append(action)
 | 
						|
            return self.__build_cmd(')') if finalize else None
 | 
						|
 | 
						|
        # Standard command: parse and return it to be executed.
 | 
						|
        qmpcmd = { 'execute': cmdargs[0], 'arguments': {} }
 | 
						|
        self.__cli_expr(cmdargs[1:], qmpcmd['arguments'])
 | 
						|
        return qmpcmd
 | 
						|
 | 
						|
    def _print(self, qmp):
 | 
						|
        jsobj = json.dumps(qmp)
 | 
						|
        if self._pp is not None:
 | 
						|
            self._pp.pprint(jsobj)
 | 
						|
        else:
 | 
						|
            print str(jsobj)
 | 
						|
 | 
						|
    def _execute_cmd(self, cmdline):
 | 
						|
        try:
 | 
						|
            qmpcmd = self.__build_cmd(cmdline)
 | 
						|
        except Exception, e:
 | 
						|
            print 'Error while parsing command line: %s' % e
 | 
						|
            print 'command format: <command-name> ',
 | 
						|
            print '[arg-name1=arg1] ... [arg-nameN=argN]'
 | 
						|
            return True
 | 
						|
        # For transaction mode, we may have just cached the action:
 | 
						|
        if qmpcmd is None:
 | 
						|
            return True
 | 
						|
        if self._verbose:
 | 
						|
            self._print(qmpcmd)
 | 
						|
        resp = self.cmd_obj(qmpcmd)
 | 
						|
        if resp is None:
 | 
						|
            print 'Disconnected'
 | 
						|
            return False
 | 
						|
        self._print(resp)
 | 
						|
        return True
 | 
						|
 | 
						|
    def connect(self):
 | 
						|
        self._greeting = qmp.QEMUMonitorProtocol.connect(self)
 | 
						|
        self.__completer_setup()
 | 
						|
 | 
						|
    def show_banner(self, msg='Welcome to the QMP low-level shell!'):
 | 
						|
        print msg
 | 
						|
        version = self._greeting['QMP']['version']['qemu']
 | 
						|
        print 'Connected to QEMU %d.%d.%d\n' % (version['major'],version['minor'],version['micro'])
 | 
						|
 | 
						|
    def get_prompt(self):
 | 
						|
        if self._transmode:
 | 
						|
            return "TRANS> "
 | 
						|
        return "(QEMU) "
 | 
						|
 | 
						|
    def read_exec_command(self, prompt):
 | 
						|
        """
 | 
						|
        Read and execute a command.
 | 
						|
 | 
						|
        @return True if execution was ok, return False if disconnected.
 | 
						|
        """
 | 
						|
        try:
 | 
						|
            cmdline = raw_input(prompt)
 | 
						|
        except EOFError:
 | 
						|
            print
 | 
						|
            return False
 | 
						|
        if cmdline == '':
 | 
						|
            for ev in self.get_events():
 | 
						|
                print ev
 | 
						|
            self.clear_events()
 | 
						|
            return True
 | 
						|
        else:
 | 
						|
            return self._execute_cmd(cmdline)
 | 
						|
 | 
						|
    def set_verbosity(self, verbose):
 | 
						|
        self._verbose = verbose
 | 
						|
 | 
						|
class HMPShell(QMPShell):
 | 
						|
    def __init__(self, address):
 | 
						|
        QMPShell.__init__(self, address)
 | 
						|
        self.__cpu_index = 0
 | 
						|
 | 
						|
    def __cmd_completion(self):
 | 
						|
        for cmd in self.__cmd_passthrough('help')['return'].split('\r\n'):
 | 
						|
            if cmd and cmd[0] != '[' and cmd[0] != '\t':
 | 
						|
                name = cmd.split()[0] # drop help text
 | 
						|
                if name == 'info':
 | 
						|
                    continue
 | 
						|
                if name.find('|') != -1:
 | 
						|
                    # Command in the form 'foobar|f' or 'f|foobar', take the
 | 
						|
                    # full name
 | 
						|
                    opt = name.split('|')
 | 
						|
                    if len(opt[0]) == 1:
 | 
						|
                        name = opt[1]
 | 
						|
                    else:
 | 
						|
                        name = opt[0]
 | 
						|
                self._completer.append(name)
 | 
						|
                self._completer.append('help ' + name) # help completion
 | 
						|
 | 
						|
    def __info_completion(self):
 | 
						|
        for cmd in self.__cmd_passthrough('info')['return'].split('\r\n'):
 | 
						|
            if cmd:
 | 
						|
                self._completer.append('info ' + cmd.split()[1])
 | 
						|
 | 
						|
    def __other_completion(self):
 | 
						|
        # special cases
 | 
						|
        self._completer.append('help info')
 | 
						|
 | 
						|
    def _fill_completion(self):
 | 
						|
        self.__cmd_completion()
 | 
						|
        self.__info_completion()
 | 
						|
        self.__other_completion()
 | 
						|
 | 
						|
    def __cmd_passthrough(self, cmdline, cpu_index = 0):
 | 
						|
        return self.cmd_obj({ 'execute': 'human-monitor-command', 'arguments':
 | 
						|
                              { 'command-line': cmdline,
 | 
						|
                                'cpu-index': cpu_index } })
 | 
						|
 | 
						|
    def _execute_cmd(self, cmdline):
 | 
						|
        if cmdline.split()[0] == "cpu":
 | 
						|
            # trap the cpu command, it requires special setting
 | 
						|
            try:
 | 
						|
                idx = int(cmdline.split()[1])
 | 
						|
                if not 'return' in self.__cmd_passthrough('info version', idx):
 | 
						|
                    print 'bad CPU index'
 | 
						|
                    return True
 | 
						|
                self.__cpu_index = idx
 | 
						|
            except ValueError:
 | 
						|
                print 'cpu command takes an integer argument'
 | 
						|
                return True
 | 
						|
        resp = self.__cmd_passthrough(cmdline, self.__cpu_index)
 | 
						|
        if resp is None:
 | 
						|
            print 'Disconnected'
 | 
						|
            return False
 | 
						|
        assert 'return' in resp or 'error' in resp
 | 
						|
        if 'return' in resp:
 | 
						|
            # Success
 | 
						|
            if len(resp['return']) > 0:
 | 
						|
                print resp['return'],
 | 
						|
        else:
 | 
						|
            # Error
 | 
						|
            print '%s: %s' % (resp['error']['class'], resp['error']['desc'])
 | 
						|
        return True
 | 
						|
 | 
						|
    def show_banner(self):
 | 
						|
        QMPShell.show_banner(self, msg='Welcome to the HMP shell!')
 | 
						|
 | 
						|
def die(msg):
 | 
						|
    sys.stderr.write('ERROR: %s\n' % msg)
 | 
						|
    sys.exit(1)
 | 
						|
 | 
						|
def fail_cmdline(option=None):
 | 
						|
    if option:
 | 
						|
        sys.stderr.write('ERROR: bad command-line option \'%s\'\n' % option)
 | 
						|
    sys.stderr.write('qemu-shell [ -v ] [ -p ] [ -H ] < UNIX socket path> | < TCP address:port >\n')
 | 
						|
    sys.exit(1)
 | 
						|
 | 
						|
def main():
 | 
						|
    addr = ''
 | 
						|
    qemu = None
 | 
						|
    hmp = False
 | 
						|
    pp = None
 | 
						|
    verbose = False
 | 
						|
 | 
						|
    try:
 | 
						|
        for arg in sys.argv[1:]:
 | 
						|
            if arg == "-H":
 | 
						|
                if qemu is not None:
 | 
						|
                    fail_cmdline(arg)
 | 
						|
                hmp = True
 | 
						|
            elif arg == "-p":
 | 
						|
                if pp is not None:
 | 
						|
                    fail_cmdline(arg)
 | 
						|
                pp = pprint.PrettyPrinter(indent=4)
 | 
						|
            elif arg == "-v":
 | 
						|
                verbose = True
 | 
						|
            else:
 | 
						|
                if qemu is not None:
 | 
						|
                    fail_cmdline(arg)
 | 
						|
                if hmp:
 | 
						|
                    qemu = HMPShell(arg)
 | 
						|
                else:
 | 
						|
                    qemu = QMPShell(arg, pp)
 | 
						|
                addr = arg
 | 
						|
 | 
						|
        if qemu is None:
 | 
						|
            fail_cmdline()
 | 
						|
    except QMPShellBadPort:
 | 
						|
        die('bad port number in command-line')
 | 
						|
 | 
						|
    try:
 | 
						|
        qemu.connect()
 | 
						|
    except qmp.QMPConnectError:
 | 
						|
        die('Didn\'t get QMP greeting message')
 | 
						|
    except qmp.QMPCapabilitiesError:
 | 
						|
        die('Could not negotiate capabilities')
 | 
						|
    except qemu.error:
 | 
						|
        die('Could not connect to %s' % addr)
 | 
						|
 | 
						|
    qemu.show_banner()
 | 
						|
    qemu.set_verbosity(verbose)
 | 
						|
    while qemu.read_exec_command(qemu.get_prompt()):
 | 
						|
        pass
 | 
						|
    qemu.close()
 | 
						|
 | 
						|
if __name__ == '__main__':
 | 
						|
    main()
 |