Source code for wbia.algo.detect.ssd

# -*- coding: utf-8 -*-
"""
Interface to SSD object proposals.
"""
import logging
import utool as ut
import vtool as vt
from os.path import abspath, dirname, expanduser, join, exists  # NOQA
import numpy as np
import sys

(print, rrr, profile) = ut.inject2(__name__, '[ssd]')
logger = logging.getLogger('wbia')

# SCRIPT_PATH = abspath(dirname(__file__))
SCRIPT_PATH = abspath(expanduser(join('~', 'code', 'ssd')))

if not ut.get_argflag('--no-ssd'):
    try:
        assert exists(SCRIPT_PATH)

        def add_path(path):
            # if path not in sys.path:
            sys.path.insert(0, path)

        # Add pycaffe to PYTHONPATH
        pycaffe_path = join(SCRIPT_PATH, 'python')
        add_path(pycaffe_path)

        import caffe

        rrr(caffe)
        from google.protobuf import text_format
        from caffe.proto import caffe_pb2
    except AssertionError:
        logger.info('WARNING Failed to find ssd. ' 'SSD is unavailable')
        # if ut.SUPER_STRICT:
        #     raise
    except ImportError:
        logger.info('WARNING Failed to import caffe. ' 'SSD is unavailable')
        # if ut.SUPER_STRICT:
        #     raise


VERBOSE_SS = ut.get_argflag('--verbssd') or ut.VERBOSE


CONFIG_URL_DICT = {
    'pretrained-300-pascal': 'https://wildbookiarepository.azureedge.net/models/pretrained.ssd.300.pascal.prototxt',
    'pretrained-512-pascal': 'https://wildbookiarepository.azureedge.net/models/pretrained.ssd.512.pascal.prototxt',
    'pretrained-300-pascal-plus': 'https://wildbookiarepository.azureedge.net/models/pretrained.ssd.300.pascal.plus.prototxt',
    'pretrained-512-pascal-plus': 'https://wildbookiarepository.azureedge.net/models/pretrained.ssd.512.pascal.plus.prototxt',
    'pretrained-300-coco': 'https://wildbookiarepository.azureedge.net/models/pretrained.ssd.300.coco.prototxt',
    'pretrained-512-coco': 'https://wildbookiarepository.azureedge.net/models/pretrained.ssd.512.coco.prototxt',
    'pretrained-300-ilsvrc': 'https://wildbookiarepository.azureedge.net/models/pretrained.ssd.300.ilsvrc.prototxt',
    'pretrained-500-ilsvrc': 'https://wildbookiarepository.azureedge.net/models/pretrained.ssd.500.ilsvrc.prototxt',
    'default': 'https://wildbookiarepository.azureedge.net/models/pretrained.ssd.512.pascal.plus.prototxt',
    None: 'https://wildbookiarepository.azureedge.net/models/pretrained.ssd.512.pascal.plus.prototxt',
}


def _parse_weight_from_cfg(url):
    return url.replace('.prototxt', '.caffemodel')


def _parse_classes_from_cfg(url):
    return url.replace('.prototxt', '.classes')


def _parse_class_list(classes_filepath):
    # Load classes from file into the class list
    assert exists(classes_filepath)
    class_list = []
    with open(classes_filepath) as classes:
        for line in classes.readlines():
            line = line.strip()
            if len(line) > 0:
                class_list.append(line)
    return class_list


[docs]def detect_gid_list(ibs, gid_list, downsample=True, verbose=VERBOSE_SS, **kwargs): """ Args: gid_list (list of int): the list of IBEIS image_rowids that need detection downsample (bool, optional): a flag to indicate if the original image sizes should be used; defaults to True True: ibs.get_image_detectpaths() is used False: ibs.get_image_paths() is used Kwargs (optional): refer to the SSD documentation for configuration settings Args: ibs (wbia.IBEISController): image analysis api gid_list (list of int): the list of IBEIS image_rowids that need detection downsample (bool, optional): a flag to indicate if the original image sizes should be used; defaults to True Kwargs: detector, config_filepath, weights_filepath, verbose Yields: tuple: (gid, gpath, result_list) CommandLine: python -m wbia.algo.detect.ssd detect_gid_list --show Example: >>> # DISABLE_DOCTEST >>> from wbia.algo.detect.ssd import * # NOQA >>> from wbia.core_images import LocalizerConfig >>> import wbia >>> ibs = wbia.opendb('testdb1') >>> gid_list = ibs.get_valid_gids() >>> config = {'verbose': True} >>> downsample = False >>> results_list = detect_gid_list(ibs, gid_list, downsample, **config) >>> results_list = list(results_list) >>> print('result lens = %r' % (map(len, list(results_list)))) >>> print('result[0] = %r' % (len(list(results_list[0][2])))) >>> config = {'verbose': True} >>> downsample = False >>> results_list = detect_gid_list(ibs, gid_list, downsample, **config) >>> results_list = list(results_list) >>> print('result lens = %r' % (map(len, list(results_list)))) >>> print('result[0] = %r' % (len(list(results_list[0][2])))) >>> ut.quit_if_noshow() >>> import wbia.plottool as pt >>> ut.show_if_requested() Yields: results (list of dict) """ # Get new gpaths if downsampling if downsample: gpath_list = ibs.get_image_detectpaths(gid_list) neww_list = [vt.open_image_size(gpath)[0] for gpath in gpath_list] oldw_list = [oldw for (oldw, oldh) in ibs.get_image_sizes(gid_list)] downsample_list = [oldw / neww for oldw, neww in zip(oldw_list, neww_list)] orient_list = [1] * len(gid_list) else: gpath_list = ibs.get_image_paths(gid_list) downsample_list = [None] * len(gpath_list) orient_list = ibs.get_image_orientation(gid_list) # Run detection results_iter = detect(gpath_list, verbose=verbose, **kwargs) # Upscale the results _iter = zip(downsample_list, gid_list, orient_list, results_iter) for downsample, gid, orient, (gpath, result_list) in _iter: # Upscale the results back up to the original image size for result in result_list: if downsample is not None and downsample != 1.0: for key in ['xtl', 'ytl', 'width', 'height']: result[key] = int(result[key] * downsample) bbox = ( result['xtl'], result['ytl'], result['width'], result['height'], ) bbox_list = [bbox] bbox = bbox_list[0] result['xtl'], result['ytl'], result['width'], result['height'] = bbox yield (gid, gpath, result_list)
[docs]def detect( gpath_list, config_filepath, weight_filepath, class_filepath, sensitivity, verbose=VERBOSE_SS, use_gpu=True, use_gpu_id=0, **kwargs, ): """ Args: gpath_list (list of str): the list of image paths that need proposal candidates Kwargs (optional): refer to the SSD documentation for configuration settings Returns: iter """ def _get_label_name(class_labelmap, label_list): if not isinstance(label_list, list): label_list = [label_list] item_list = class_labelmap.item name_list = [] for label in label_list: found = False for i in range(len(item_list)): if label == item_list[i].label: found = True name_list.append(item_list[i].display_name) break assert found return name_list # Get correct config if specified with shorthand config_url = None if config_filepath in CONFIG_URL_DICT: config_url = CONFIG_URL_DICT[config_filepath] config_filepath = ut.grab_file_url(config_url, appname='wbia', check_hash=True) # Get correct weights if specified with shorthand if weight_filepath in CONFIG_URL_DICT: if weight_filepath is None and config_url is not None: config_url_ = config_url else: config_url_ = CONFIG_URL_DICT[weight_filepath] weight_url = _parse_weight_from_cfg(config_url_) weight_filepath = ut.grab_file_url(weight_url, appname='wbia', check_hash=True) if class_filepath is None: class_url = _parse_classes_from_cfg(config_url) class_filepath = ut.grab_file_url( class_url, appname='wbia', check_hash=True, verbose=verbose ) # load class labels with open(class_filepath, 'r') as class_file: class_labelmap = caffe_pb2.LabelMap() class_str = str(class_file.read(class_file)) text_format.Merge(class_str, class_labelmap) # Need to convert unicode strings to Python strings to support Boost Python # call signatures in caffe prototxt_filepath = str(config_filepath) # alias to Caffe nomenclature caffemodel_filepath = str(weight_filepath) # alias to Caffe nomenclature assert exists(prototxt_filepath), 'Specified prototxt file not found' assert exists(caffemodel_filepath), 'Specified caffemodel file not found' if use_gpu: caffe.set_mode_gpu() caffe.set_device(use_gpu_id) else: caffe.set_mode_cpu() net = caffe.Net(prototxt_filepath, caffemodel_filepath, caffe.TEST) # Determine input size from prototext with open(prototxt_filepath, 'r') as prototxt_file: # load all lines line_list = prototxt_file.readlines() # look for dim size lines line_list = [line for line in line_list if 'dim:' in line] line_list = line_list[:4] # Get last line line = line_list[-1] line_ = line.strip().split(' ') # Filter empty spaces line_ = [_ for _ in line_ if len(_) > 0] # Get last value on line, which should be the image size image_resize = int(line_[-1]) # Check to make sure assert image_resize in [300, 500, 512] logger.info('FOUND image_resize = %r' % (image_resize,)) # Input preprocessing: 'data' is the name of the input blob == net.inputs[0] transformer = caffe.io.Transformer({'data': net.blobs['data'].data.shape}) transformer.set_transpose('data', (2, 0, 1)) # Mean pixel value transformer.set_mean('data', np.array([104, 117, 123])) # The reference model operates on images in [0,255] range instead of [0,1] transformer.set_raw_scale('data', 255) # The reference model has channels in BGR order instead of RGB transformer.set_channel_swap('data', (2, 1, 0)) # Set batch size to 1 and set testing image size net.blobs['data'].reshape(1, 3, image_resize, image_resize) results_list_ = [] for gpath in gpath_list: image = caffe.io.load_image(gpath) transformed_image = transformer.preprocess('data', image) net.blobs['data'].data[...] = transformed_image # Forward pass. detections = net.forward()['detection_out'] # Parse the outputs. det_label = detections[0, 0, :, 1] det_conf = detections[0, 0, :, 2] det_xmin = detections[0, 0, :, 3] det_ymin = detections[0, 0, :, 4] det_xmax = detections[0, 0, :, 5] det_ymax = detections[0, 0, :, 6] # Get detections with confidence higher than 0.6. top_indices = [i for i, conf in enumerate(det_conf) if conf >= sensitivity] top_conf = det_conf[top_indices] top_label_indices = det_label[top_indices].tolist() top_labels = _get_label_name(class_labelmap, top_label_indices) top_xmin = det_xmin[top_indices] top_ymin = det_ymin[top_indices] top_xmax = det_xmax[top_indices] top_ymax = det_ymax[top_indices] height, width = image.shape[:2] # Compile results result_list_ = [] zipped = zip(top_xmin, top_ymin, top_xmax, top_ymax, top_labels, top_conf) for (xmin, ymin, xmax, ymax, label, conf) in zipped: xtl = int(np.around(xmin * width)) ytl = int(np.around(ymin * height)) xbr = int(np.around(xmax * width)) ybr = int(np.around(ymax * height)) confidence = float(conf) result_dict = { 'xtl': xtl, 'ytl': ytl, 'width': xbr - xtl, 'height': ybr - ytl, 'class': label, 'confidence': confidence, } result_list_.append(result_dict) results_list_.append(result_list_) results_list = zip(gpath_list, results_list_) return results_list