@@ -80,9 +80,11 @@ luks_default_key_secret_opt = 'key-secret=keysec0'
def qemu_img(*args):
'''Run qemu-img and return the exit code'''
devnull = open('/dev/null', 'r+')
- exitcode = subprocess.call(qemu_img_args + list(args), stdin=devnull, stdout=devnull)
+ exitcode = subprocess.call(qemu_img_args + list(args),
+ stdin=devnull, stdout=devnull)
if exitcode < 0:
- sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
+ sys.stderr.write('qemu-img received signal %i: %s\n'
+ % (-exitcode, ' '.join(qemu_img_args + list(args))))
return exitcode
def ordered_qmp(qmsg, conv_keys=True):
@@ -121,7 +123,8 @@ def qemu_img_verbose(*args):
'''Run qemu-img without suppressing its output and return the exit code'''
exitcode = subprocess.call(qemu_img_args + list(args))
if exitcode < 0:
- sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
+ sys.stderr.write('qemu-img received signal %i: %s\n'
+ % (-exitcode, ' '.join(qemu_img_args + list(args))))
return exitcode
def qemu_img_pipe(*args):
@@ -132,7 +135,8 @@ def qemu_img_pipe(*args):
universal_newlines=True)
exitcode = subp.wait()
if exitcode < 0:
- sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
+ sys.stderr.write('qemu-img received signal %i: %s\n'
+ % (-exitcode, ' '.join(qemu_img_args + list(args))))
return subp.communicate()[0]
def qemu_img_log(*args):
@@ -162,7 +166,8 @@ def qemu_io(*args):
universal_newlines=True)
exitcode = subp.wait()
if exitcode < 0:
- sys.stderr.write('qemu-io received signal %i: %s\n' % (-exitcode, ' '.join(args)))
+ sys.stderr.write('qemu-io received signal %i: %s\n'
+ % (-exitcode, ' '.join(args)))
return subp.communicate()[0]
def qemu_io_log(*args):
@@ -284,10 +289,13 @@ win32_re = re.compile(r"\r")
def filter_win32(msg):
return win32_re.sub("", msg)
-qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* \([0-9\/.inf]* [EPTGMKiBbytes]*\/sec and [0-9\/.inf]* ops\/sec\)")
+qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
+ r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
+ r"and [0-9\/.inf]* ops\/sec\)")
def filter_qemu_io(msg):
msg = filter_win32(msg)
- return qemu_io_re.sub("X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)", msg)
+ return qemu_io_re.sub("X ops; XX:XX:XX.X "
+ "(XXX YYY/sec and XXX ops/sec)", msg)
chown_re = re.compile(r"chown [0-9]+:[0-9]+")
def filter_chown(msg):
@@ -340,7 +348,9 @@ def filter_img_info(output, filename):
line = filter_testfiles(line)
line = line.replace(imgfmt, 'IMGFMT')
line = re.sub('iters: [0-9]+', 'iters: XXX', line)
- line = re.sub('uuid: [-a-f0-9]+', 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', line)
+ line = re.sub('uuid: [-a-f0-9]+',
+ 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
+ line)
line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
lines.append(line)
return '\n'.join(lines)
@@ -538,11 +548,13 @@ class VM(qtest.QEMUQtestMachine):
self.pause_drive(drive, "write_aio")
return
self.qmp('human-monitor-command',
- command_line='qemu-io %s "break %s bp_%s"' % (drive, event, drive))
+ command_line='qemu-io %s "break %s bp_%s"'
+ % (drive, event, drive))
def resume_drive(self, drive):
self.qmp('human-monitor-command',
- command_line='qemu-io %s "remove_break bp_%s"' % (drive, drive))
+ command_line='qemu-io %s "remove_break bp_%s"'
+ % (drive, drive))
def hmp_qemu_io(self, drive, cmd):
'''Write to a given drive using an HMP command'''
@@ -802,16 +814,18 @@ class QMPTestCase(unittest.TestCase):
idx = int(idx)
if not isinstance(d, dict) or component not in d:
- self.fail('failed path traversal for "%s" in "%s"' % (path, str(d)))
+ self.fail(f'failed path traversal for "{path}" in "{d}"')
d = d[component]
if m:
if not isinstance(d, list):
- self.fail('path component "%s" in "%s" is not a list in "%s"' % (component, path, str(d)))
+ self.fail(f'path component "{component}" in "{path}" '
+ f'is not a list in "{d}"')
try:
d = d[idx]
except IndexError:
- self.fail('invalid index "%s" in path "%s" in "%s"' % (idx, path, str(d)))
+ self.fail(f'invalid index "{idx}" in path "{path}" '
+ f'in "{d}"')
return d
def assert_qmp_absent(self, d, path):
@@ -862,10 +876,13 @@ class QMPTestCase(unittest.TestCase):
'''Asserts that the given filename is a json: filename and that its
content is equal to the given reference object'''
self.assertEqual(json_filename[:5], 'json:')
- self.assertEqual(self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
- self.vm.flatten_qmp_object(reference))
+ self.assertEqual(
+ self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
+ self.vm.flatten_qmp_object(reference)
+ )
- def cancel_and_wait(self, drive='drive0', force=False, resume=False, wait=60.0):
+ def cancel_and_wait(self, drive='drive0', force=False,
+ resume=False, wait=60.0):
'''Cancel a block job and wait for it to finish, returning the event'''
result = self.vm.qmp('block-job-cancel', device=drive, force=force)
self.assert_qmp(result, 'return', {})
@@ -889,8 +906,8 @@ class QMPTestCase(unittest.TestCase):
self.assert_no_active_block_jobs()
return result
- def wait_until_completed(self, drive='drive0', check_offset=True, wait=60.0,
- error=None):
+ def wait_until_completed(self, drive='drive0', check_offset=True,
+ wait=60.0, error=None):
'''Wait for a block job to finish, returning the event'''
while True:
for event in self.vm.get_qmp_events(wait=wait):
@@ -1029,8 +1046,11 @@ def verify_quorum():
notrun('quorum support missing')
def qemu_pipe(*args):
- '''Run qemu with an option to print something and exit (e.g. a help option),
- and return its output'''
+ """
+ Run qemu with an option to print something and exit (e.g. a help option).
+
+ :return: QEMU's stdout output.
+ """
args = [qemu_prog] + qemu_opts + list(args)
subp = subprocess.Popen(args, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
@@ -1068,8 +1088,8 @@ def skip_if_unsupported(required_formats=(), read_only=False):
usf_list = list(set(fmts) - set(supported_formats(read_only)))
if usf_list:
- test_case.case_skip('{}: formats {} are not whitelisted'.format(
- test_case, usf_list))
+ msg = f'{test_case}: formats {usf_list} are not whitelisted'
+ test_case.case_skip(msg)
return None
else:
return func(test_case, *args, **kwargs)
@@ -18,5 +18,9 @@ disable=invalid-name,
too-many-locals,
too-many-public-methods,
# These are temporary, and should be removed:
- line-too-long,
missing-docstring,
+
+[FORMAT]
+
+# Maximum number of characters on a single line.
+max-line-length=79