Source code for swiftly.cli.ping

"""
Contains a CLICommand that implements ping test functionality.

Uses the following from :py:class:`swiftly.cli.context.CLIContext`:

===============  ====================================================
client_manager   For connecting to Swift.
concurrency      The number of concurrent actions that can be
                 performed.
io_manager       For directing output.
limit            The maximum number of Swift nodes to output
                 information about.
object_ring      An instance of swift.common.ring.ring.Ring if you
                 want a report based on Swift nodes with implied
                 usage during the ping test.
ping_begin       The first time.time() when the entire ping test
                 began.
ping_begin_last  The time.time() the last ping task started.
ping_count       The number of objects to use.
ping_verbose     True if you want a full ping report rather than just
                 the overall time.
threshold        Defines the threshold for the threshold node report.
                 This is the multiplier over the average request
                 time.
===============  ====================================================
"""
"""
Copyright 2011-2013 Gregory Holt

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import collections
import StringIO
import time
import traceback
import uuid

from swiftly.concurrency import Concurrency
from swiftly.cli.command import CLICommand, ReturnCode

try:
    from eventlet import sleep
except ImportError:
    sleep = time.sleep


def _cli_ping_status(context, heading, ident, status, reason, headers,
                     contents):
    if headers:
        ident = headers.get('x-trans-id') or ident
    if hasattr(contents, 'read'):
        contents.read()
    if status and status // 100 != 2:
        raise ReturnCode(
            'with %s: %s %s %s' % (heading, status, reason, ident))
    now = time.time()
    if context.ping_verbose:
        with context.io_manager.with_stdout() as fp:
            fp.write(
                '% 6.02fs %s %s\n' %
                (now - context.ping_begin_last, heading, ident))
            fp.flush()
    context.ping_begin_last = now


def _cli_ping_objects(context, heading, conc, container, objects, func,
                      results):
    begin = time.time()
    for obj in objects:
        for (exc_type, exc_value, exc_tb, result) in \
                conc.get_results().itervalues():
            if exc_value:
                with context.io_manager.with_stderr() as fp:
                    fp.write(str(exc_value))
                    fp.write('\n')
                    fp.flush()
        conc.spawn(obj, func, context, results, container, obj)
    conc.join()
    for (exc_type, exc_value, exc_tb, result) in \
            conc.get_results().itervalues():
        if exc_value:
            with context.io_manager.with_stderr() as fp:
                fp.write(str(exc_value))
                fp.write('\n')
                fp.flush()
    elapsed = time.time() - begin
    _cli_ping_status(
        context,
        'object %s x%d at %d concurrency, %.02f/s' %
        (heading, len(objects), conc.concurrency, len(objects) / elapsed),
        None, None, None, None, None)
    overall = results.get('overall')
    if overall:
        overall = sorted(overall, key=lambda x: x[0])
        results['overall'] = overall
        if context.ping_verbose or context.graphite:
            best = overall[0][0]
            worst = overall[-1][0]
            mean = overall[len(overall) / 2][0]
            median = sum(x[0] for x in overall) / len(overall)
            threshold = max(2, mean * 2)
            slows = 0
            for x in overall:
                if x[0] > 2 and x[0] > threshold:
                    slows += 1
            slow_percentage = 100.0 * slows / len(overall)
            with context.io_manager.with_stdout() as fp:
                if context.ping_verbose:
                    fp.write(
                        '        best %.02fs, worst %.02fs, mean %.02fs, '
                        'median %.02fs\n        %d slower than 2s or twice '
                        'the mean, %.02f%%\n' % (
                            best, worst, mean, median, slows, slow_percentage))
                    fp.flush()
                if context.graphite:
                    fp.write(
                        '%s.%s.slow_percentage %.02f %d\n' % (
                            context.graphite, heading, slow_percentage,
                            time.time()))


def _cli_ping_object_put(context, results, container, obj):
    with context.client_manager.with_client() as client:
        begin = time.time()
        try:
            status, reason, headers, contents = client.put_object(
                container, obj, StringIO.StringIO('swiftly-ping'))
        except Exception:
            raise ReturnCode(
                'putting object %r: %s' % (obj, traceback.format_exc()))
        if status // 100 != 2:
            raise ReturnCode(
                'putting object %r: %s %s %s' %
                (obj, status, reason, headers.get('x-trans-id') or '-'))
        elapsed = time.time() - begin
        results['overall'].append((elapsed, headers.get('x-trans-id') or obj))
        if context.object_ring:
            for node in context.object_ring.get_nodes(
                    client.get_account_hash(), container, obj)[1]:
                results[node['ip']].append(
                    (elapsed, headers.get('x-trans-id') or obj))


def _cli_ping_object_get(context, results, container, obj):
    with context.client_manager.with_client() as client:
        begin = time.time()
        try:
            status, reason, headers, contents = client.get_object(
                container, obj, stream=False)
        except Exception:
            raise ReturnCode(
                'getting object %r: %s' % (obj, traceback.format_exc()))
        if status // 100 != 2:
            raise ReturnCode(
                'getting object %r: %s %s %s' %
                (obj, status, reason, headers.get('x-trans-id') or '-'))
        elapsed = time.time() - begin
        results['overall'].append((elapsed, headers.get('x-trans-id') or obj))
        if context.object_ring:
            for node in context.object_ring.get_nodes(
                    client.get_account_hash(), container, obj)[1]:
                results[node['ip']].append(
                    (elapsed, headers.get('x-trans-id') or obj))


def _cli_ping_object_delete(context, results, container, obj):
    with context.client_manager.with_client() as client:
        begin = time.time()
        try:
            status, reason, headers, contents = client.delete_object(
                container, obj)
        except Exception:
            raise ReturnCode(
                'deleting object %r: %s' % (obj, traceback.format_exc()))
        if status // 100 != 2 and status != 404:
            raise ReturnCode(
                'deleting object %r: %s %s %s' %
                (obj, status, reason, headers.get('x-trans-id') or '-'))
        elapsed = time.time() - begin
        results['overall'].append((elapsed, headers.get('x-trans-id') or obj))
        if context.object_ring:
            for node in context.object_ring.get_nodes(
                    client.get_account_hash(), container, obj)[1]:
                results[node['ip']].append(
                    (elapsed, headers.get('x-trans-id') or obj))


def _cli_ping_ring_report(context, timings_dict, label):
    timings_dict.pop('overall', None)  # Not currently used in this function
    if not timings_dict:
        return
    worsts = {}
    for ip, timings in timings_dict.iteritems():
        worst = [0, None]
        for timing in timings:
            if timing[0] > worst[0]:
                worst = timing
        worsts[ip] = worst
    with context.io_manager.with_stdout() as fp:
        fp.write(
            'Worst %s times for up to %d nodes with implied usage:\n' %
            (label, context.limit))
        for ip, (elapsed, xid) in sorted(
                worsts.iteritems(), key=lambda x: x[1][0],
                reverse=True)[:context.limit]:
            fp.write('    %20s % 6.02fs %s\n' % (ip, elapsed, xid))
        fp.flush()
    with context.io_manager.with_stdout() as fp:
        averages = {}
        for ip, timings in timings_dict.iteritems():
            averages[ip] = sum(t[0] for t in timings) / len(timings)
        fp.write(
            'Average %s times for up to %d nodes with implied usage:\n' %
            (label, context.limit))
        for ip, elapsed in sorted(
                averages.iteritems(), key=lambda x: x[1],
                reverse=True)[:context.limit]:
            fp.write('    %20s % 6.02fs\n' % (ip, elapsed))
        fp.flush()
    total = 0.0
    count = 0
    for ip, timings in timings_dict.iteritems():
        total += sum(t[0] for t in timings)
        count += len(timings)
    threshold = total / count * context.threshold
    counts = collections.defaultdict(lambda: 0)
    for ip, timings in timings_dict.iteritems():
        for t in timings:
            if t[0] > threshold:
                counts[ip] += 1
    with context.io_manager.with_stdout() as fp:
        fp.write(
            'Count of %s times past (average * %d) for up to %d nodes with '
            'implied usage:\n' % (label, context.threshold, context.limit))
        for ip, count in sorted(
                counts.iteritems(), key=lambda x: x[1],
                reverse=True)[:context.limit]:
            fp.write('    %20s % 6d\n' % (ip, count))
        fp.flush()
    percentages = {}
    for ip, count in counts.iteritems():
        percentages[ip] = (
            100.0 * count / len(timings_dict[ip]),
            count, len(timings_dict[ip]))
    with context.io_manager.with_stdout() as fp:
        fp.write(
            'Percentage of %s times past (average * %d) for up to %d nodes '
            'with implied usage:\n' %
            (label, context.threshold, context.limit))
        for ip, percentage in sorted(
                percentages.iteritems(), key=lambda x: x[1][0],
                reverse=True)[:context.limit]:
            fp.write(
                '    %20s % 6.02f%%  %d of %d\n' %
                (ip, percentage[0], percentage[1], percentage[2]))
        fp.flush()


[docs]def cli_ping(context, prefix): """ Performs a ping test. See :py:mod:`swiftly.cli.ping` for context usage information. See :py:class:`CLIPing` for more information. :param context: The :py:class:`swiftly.cli.context.CLIContext` to use. :param prefix: The container name prefix to use. Default: swiftly-ping """ if not prefix: prefix = 'swiftly-ping' ping_ring_object_puts = collections.defaultdict(lambda: []) ping_ring_object_gets = collections.defaultdict(lambda: []) ping_ring_object_deletes = collections.defaultdict(lambda: []) context.ping_begin = context.ping_begin_last = time.time() container = prefix + '-' + uuid.uuid4().hex objects = [uuid.uuid4().hex for x in xrange(context.ping_count)] conc = Concurrency(context.concurrency) with context.client_manager.with_client() as client: client.auth() _cli_ping_status(context, 'auth', '-', None, None, None, None) _cli_ping_status(context, 'account head', '-', *client.head_account()) _cli_ping_status( context, 'container put', '-', *client.put_container(container)) if _cli_ping_objects( context, 'put', conc, container, objects, _cli_ping_object_put, ping_ring_object_puts): with context.io_manager.with_stderr() as fp: fp.write( 'ERROR put objects did not complete successfully due to ' 'previous error; but continuing\n') fp.flush() if _cli_ping_objects( context, 'get', conc, container, objects, _cli_ping_object_get, ping_ring_object_gets): with context.io_manager.with_stderr() as fp: fp.write( 'ERROR get objects did not complete successfully due to ' 'previous error; but continuing\n') fp.flush() if _cli_ping_objects( context, 'delete', conc, container, objects, _cli_ping_object_delete, ping_ring_object_deletes): with context.io_manager.with_stderr() as fp: fp.write( 'ERROR delete objects did not complete successfully due to ' 'previous error; but continuing\n') fp.flush() for attempt in xrange(5): if attempt: sleep(2**attempt) with context.client_manager.with_client() as client: try: _cli_ping_status( context, 'container delete', '-', *client.delete_container(container)) break except ReturnCode as err: with context.io_manager.with_stderr() as fp: fp.write(str(err)) fp.write('\n') fp.flush() else: with context.io_manager.with_stderr() as fp: fp.write( 'ERROR could not confirm deletion of container due to ' 'previous error; but continuing\n') fp.flush() end = time.time() with context.io_manager.with_stdout() as fp: if context.graphite: fp.write( '%s.ping_overall %.02f %d\n' % ( context.graphite, end - context.ping_begin, time.time())) if context.ping_verbose: fp.write('% 6.02fs total\n' % (end - context.ping_begin)) elif not context.graphite: fp.write('%.02fs\n' % (end - context.ping_begin)) fp.flush() ping_ring_overall = collections.defaultdict(lambda: []) _cli_ping_ring_report(context, ping_ring_object_puts, 'PUT') for ip, timings in ping_ring_object_puts.iteritems(): ping_ring_overall[ip].extend(timings) _cli_ping_ring_report(context, ping_ring_object_gets, 'GET') for ip, timings in ping_ring_object_gets.iteritems(): ping_ring_overall[ip].extend(timings) _cli_ping_ring_report(context, ping_ring_object_deletes, 'DELETE') for ip, timings in ping_ring_object_deletes.iteritems(): ping_ring_overall[ip].extend(timings) _cli_ping_ring_report(context, ping_ring_overall, 'overall')
[docs]class CLIPing(CLICommand): """ A CLICommand that implements ping test functionality. See the output of ``swiftly help ping`` for more information. """ def __init__(self, cli): super(CLIPing, self).__init__( cli, 'ping', max_args=1, usage=""" Usage: %prog [main_options] ping [options] [path] For help on [main_options] run %prog with no args. Runs a ping test against the account. The [path] will be used as a prefix to the random container name used (default: swiftly-ping).""".strip()) self.option_parser.add_option( '-v', '--verbose', dest='ping_verbose', action='store_true', help='Outputs additional information as ping works.') self.option_parser.add_option( '-c', '--count', dest='ping_count', default=1, help='Count of objects to create; default 1.') self.option_parser.add_option( '-o', '--object-ring', dest='object_ring', help='The current object ring of the cluster being pinged. This ' 'will enable output of which nodes are involved in the ' 'object requests and their implied behavior. Use of this ' 'also requires the main Swift code is installed and ' 'importable.') self.option_parser.add_option( '-l', '--limit', dest='limit', help='Limits the node output tables to LIMIT nodes.') self.option_parser.add_option( '-t', '--threshold', dest='threshold', help='Changes the threshold for the final (average * x) reports. ' 'This will define the value of x, defaults to 2.') self.option_parser.add_option( '-g', '--graphite', dest='graphite', metavar='PREFIX', help='Switches to "graphite" output. The output will be lines of ' '"PREFIX.metric value timestamp" suitable for piping to ' 'graphite (through netcat or something similar).') def __call__(self, args): options, args, context = self.parse_args_and_create_context(args) context.ping_count = int(options.ping_count or 1) context.ping_verbose = options.ping_verbose context.object_ring = None if options.object_ring: import swift.common.ring.ring context.object_ring = swift.common.ring.ring.Ring( options.object_ring) context.limit = int(options.limit or 10) context.threshold = int(options.threshold or 2) context.graphite = options.graphite prefix = args.pop(0) if args else 'swiftly-ping' return cli_ping(context, prefix)