Source code for wbia.dbio.ingest_hsdb

# -*- coding: utf-8 -*-
#!/usr/bin/env python  # NOQA
"""
Converts a hotspostter database to IBEIS
"""
# TODO: ADD COPYRIGHT TAG
import logging
from os.path import join, exists
from wbia import constants as const
from wbia.init import sysres
import utool as ut
import re
import csv

print, rrr, profile = ut.inject2(__name__)
logger = logging.getLogger('wbia')


SUCCESS_FLAG_FNAME = '_hsdb_to_ibeis_convert_success'


[docs]def is_hsdb(dbdir): return is_hsdbv4(dbdir) or is_hsdbv3(dbdir)
[docs]def is_hsdbv4(dbdir): has4 = ( exists(join(dbdir, '_hsdb')) and exists(join(dbdir, '_hsdb', 'name_table.csv')) and exists(join(dbdir, '_hsdb', 'image_table.csv')) and exists(join(dbdir, '_hsdb', 'chip_table.csv')) ) return has4
[docs]def is_hsdbv3(dbdir): has3 = ( exists(join(dbdir, '.hs_internals')) and exists(join(dbdir, '.hs_internals', 'name_table.csv')) and exists(join(dbdir, '.hs_internals', 'image_table.csv')) and exists(join(dbdir, '.hs_internals', 'chip_table.csv')) ) return has3
[docs]def get_hsinternal(hsdb_dir): internal_dir = join(hsdb_dir, '_hsdb') if not is_hsdbv4(hsdb_dir): internal_dir = join(hsdb_dir, '.hs_internals') return internal_dir
[docs]def is_hsinternal(dbdir): return exists(join(dbdir, '.hs_internals'))
[docs]def is_succesful_convert(dbdir): """the sucess flag is only written if the _ibsdb was properly generated""" return exists(join(dbdir, const.PATH_NAMES._ibsdb, SUCCESS_FLAG_FNAME))
[docs]def get_unconverted_hsdbs(workdir=None): r""" Args: workdir (None): (default = None) CommandLine: python -m wbia.dbio.ingest_hsdb --test-get_unconverted_hsdbs Example: >>> # SCRIPT >>> from wbia.dbio.ingest_hsdb import * # NOQA >>> workdir = None >>> result = get_unconverted_hsdbs(workdir) >>> print(result) """ import os import numpy as np if workdir is None: workdir = sysres.get_workdir() dbname_list = os.listdir(workdir) dbpath_list = np.array([join(workdir, name) for name in dbname_list]) needs_convert = list(map(check_unconverted_hsdb, dbpath_list)) needs_convert_hsdbs = ut.compress(dbpath_list, needs_convert) return needs_convert_hsdbs
[docs]def ingest_unconverted_hsdbs_in_workdir(): workdir = sysres.get_workdir() needs_convert_hsdbs = get_unconverted_hsdbs(workdir) for hsdb in needs_convert_hsdbs: try: convert_hsdb_to_wbia(hsdb) except Exception as ex: ut.printex(ex) raise
[docs]def check_unconverted_hsdb(dbdir): """ Returns if a directory is an unconverted hotspotter database """ return is_hsdb(dbdir) and not is_succesful_convert(dbdir)
[docs]def testdata_ensure_unconverted_hsdb(): r""" Makes an unconverted test datapath CommandLine: python -m wbia.dbio.ingest_hsdb --test-testdata_ensure_unconverted_hsdb Example: >>> # SCRIPT >>> from wbia.dbio.ingest_hsdb import * # NOQA >>> result = testdata_ensure_unconverted_hsdb() >>> print(result) """ import utool as ut assert ut.is_developer(), 'dev function only' # Make an unconverted test database ut.ensurepath('/raid/tests/tmp') ut.delete('/raid/tests/tmp/Frogs') ut.copy('/raid/tests/Frogs', '/raid/tests/tmp/Frogs') hsdb_dir = '/raid/tests/tmp/Frogs' return hsdb_dir
[docs]def convert_hsdb_to_wbia(hsdir, dbdir=None, **kwargs): r""" Args hsdir (str): Directory to folder *containing* _hsdb dbdir (str): Output directory (defaults to same as hsdb) CommandLine: python -m wbia convert_hsdb_to_wbia --dbdir ~/work/Frogs python -m wbia convert_hsdb_to_wbia --hsdir "/raid/raw/RotanTurtles/Roatan HotSpotter Nov_21_2016" Ignore: from wbia.dbio.ingest_hsdb import * # NOQA hsdir = "/raid/raw/RotanTurtles/Roatan HotSpotter Nov_21_2016" dbdir = "~/work/RotanTurtles" Example: >>> # SCRIPT >>> from wbia.dbio.ingest_hsdb import * # NOQA >>> dbdir = ut.get_argval('--dbdir', type_=str, default=None) >>> hsdir = ut.get_argval('--hsdir', type_=str, default=dbdir) >>> result = convert_hsdb_to_wbia(hsdir) >>> print(result) """ from wbia.control import IBEISControl import utool as ut if dbdir is None: dbdir = hsdir logger.info('[ingest] Ingesting hsdb: %r -> %r' % (hsdir, dbdir)) assert is_hsdb( hsdir ), 'not a hotspotter database. cannot even force convert: hsdir=%r' % (hsdir,) assert not is_succesful_convert(dbdir), 'hsdir=%r is already converted' % (hsdir,) # logger.info('FORCE DELETE: %r' % (hsdir,)) # ibsfuncs.delete_wbia_database(hsdir) imgdir = join(hsdir, 'images') internal_dir = get_hsinternal(hsdir) nametbl_fpath = join(internal_dir, 'name_table.csv') imgtbl_fpath = join(internal_dir, 'image_table.csv') chiptbl_fpath = join(internal_dir, 'chip_table.csv') # READ NAME TABLE name_text_list = ['____'] name_hs_nid_list = [0] with open(nametbl_fpath, 'r') as nametbl_file: name_reader = csv.reader(nametbl_file) for ix, row in enumerate(name_reader): # if ix >= 3: if len(row) == 0 or row[0].strip().startswith('#'): continue else: hs_nid = int(row[0]) name = row[1].strip() name_text_list.append(name) name_hs_nid_list.append(hs_nid) # READ IMAGE TABLE iamge_hs_gid_list = [] image_gname_list = [] image_reviewed_list = [] with open(imgtbl_fpath, 'r') as imgtb_file: image_reader = csv.reader(imgtb_file) for ix, row in enumerate(image_reader): if len(row) == 0 or row[0].strip().startswith('#'): continue else: hs_gid = int(row[0]) gname_ = row[1].strip() # aif in hotspotter is equivilant to reviewed in IBEIS reviewed = bool(row[2]) iamge_hs_gid_list.append(hs_gid) image_gname_list.append(gname_) image_reviewed_list.append(reviewed) image_gpath_list = [join(imgdir, gname) for gname in image_gname_list] ut.debug_duplicate_items(image_gpath_list) # logger.info(image_gpath_list) image_exist_flags = list(map(exists, image_gpath_list)) missing_images = [] for image_gpath, flag in zip(image_gpath_list, image_exist_flags): if not flag: missing_images.append(image_gpath) logger.info('Image does not exist: %s' % image_gpath) if not all(image_exist_flags): logger.info( 'Only %d / %d image exist' % (sum(image_exist_flags), len(image_exist_flags)) ) SEARCH_FOR_IMAGES = False if SEARCH_FOR_IMAGES: # Hack to try and find the missing images from os.path import basename subfiles = ut.glob(hsdir, '*', recursive=True, fullpath=True, with_files=True) basename_to_existing = ut.group_items(subfiles, ut.lmap(basename, subfiles)) can_copy_list = [] for gpath in missing_images: gname = basename(gpath) if gname not in basename_to_existing: logger.info('gname = %r' % (gname,)) pass else: existing = basename_to_existing[gname] can_choose = True if len(existing) > 1: if not ut.allsame(ut.lmap(ut.get_file_uuid, existing)): can_choose = False if can_choose: found = existing[0] can_copy_list.append((found, gpath)) else: logger.info(existing) src, dst = ut.listT(can_copy_list) ut.copy_list(src, dst) # READ CHIP TABLE chip_bbox_list = [] chip_theta_list = [] chip_hs_nid_list = [] chip_hs_gid_list = [] chip_note_list = [] with open(chiptbl_fpath, 'r') as chiptbl_file: chip_reader = csv.reader(chiptbl_file) for ix, row in enumerate(chip_reader): if len(row) == 0 or row[0].strip().startswith('#'): continue else: hs_gid = int(row[1]) hs_nid = int(row[2]) bbox_text = row[3] theta = float(row[4]) notes = '<COMMA>'.join([item.strip() for item in row[5:]]) bbox_text = bbox_text.replace('[', '').replace(']', '').strip() bbox_text = re.sub(' *', ' ', bbox_text) bbox_strlist = bbox_text.split(' ') bbox = tuple(map(int, bbox_strlist)) # bbox = [int(item) for item in bbox_strlist] chip_hs_nid_list.append(hs_nid) chip_hs_gid_list.append(hs_gid) chip_bbox_list.append(bbox) chip_theta_list.append(theta) chip_note_list.append(notes) names = ut.ColumnLists({'hs_nid': name_hs_nid_list, 'text': name_text_list}) images = ut.ColumnLists( { 'hs_gid': iamge_hs_gid_list, 'gpath': image_gpath_list, 'reviewed': image_reviewed_list, 'exists': image_exist_flags, } ) chips = ut.ColumnLists( { 'hs_gid': chip_hs_gid_list, 'hs_nid': chip_hs_nid_list, 'bbox': chip_bbox_list, 'theta': chip_theta_list, 'note': chip_note_list, } ) IGNORE_MISSING_IMAGES = True if IGNORE_MISSING_IMAGES: # Ignore missing information logger.info('pre') logger.info('chips = %r' % (chips,)) logger.info('images = %r' % (images,)) logger.info('names = %r' % (names,)) missing_gxs = ut.where(ut.not_list(images['exists'])) missing_gids = ut.take(images['hs_gid'], missing_gxs) gid_to_cxs = ut.dzip(*chips.group_indicies('hs_gid')) missing_cxs = ut.flatten(ut.take(gid_to_cxs, missing_gids)) # Remove missing images and dependant chips images = images.remove(missing_gxs) chips = chips.remove(missing_cxs) valid_nids = set(chips['hs_nid'] + [0]) isvalid = [nid in valid_nids for nid in names['hs_nid']] names = names.compress(isvalid) logger.info('post') logger.info('chips = %r' % (chips,)) logger.info('images = %r' % (images,)) logger.info('names = %r' % (names,)) assert all(images['exists']), 'some images dont exist' # if gid is None: # logger.info('Not adding the ix=%r-th Chip. Its image is corrupted image.' % (ix,)) # # continue # # Build mappings to new indexes # names_nid_to_nid = {names_nid: nid for (names_nid, nid) in zip(hs_nid_list, nid_list)} # names_nid_to_nid[1] = names_nid_to_nid[0] # hsdb unknknown is 0 or 1 # images_gid_to_gid = {images_gid: gid for (images_gid, gid) in zip(hs_gid_list, gid_list)} ibs = IBEISControl.request_IBEISController(dbdir=dbdir, check_hsdb=False, **kwargs) assert len(ibs.get_valid_gids()) == 0, 'target database is not empty' # Add names, images, and annotations names['ibs_nid'] = ibs.add_names(names['text']) images['ibs_gid'] = ibs.add_images(images['gpath']) # any failed gids will be None if True: # Remove corrupted images logger.info('pre') logger.info('chips = %r' % (chips,)) logger.info('images = %r' % (images,)) logger.info('names = %r' % (names,)) missing_gxs = ut.where(ut.flag_None_items(images['ibs_gid'])) missing_gids = ut.take(images['hs_gid'], missing_gxs) gid_to_cxs = ut.dzip(*chips.group_indicies('hs_gid')) missing_cxs = ut.flatten(ut.take(gid_to_cxs, missing_gids)) # Remove missing images and dependant chips chips = chips.remove(missing_cxs) images = images.remove(missing_gxs) logger.info('post') logger.info('chips = %r' % (chips,)) logger.info('images = %r' % (images,)) logger.info('names = %r' % (names,)) # Index chips using new ibs rowids ibs_gid_lookup = ut.dzip(images['hs_gid'], images['ibs_gid']) ibs_nid_lookup = ut.dzip(names['hs_nid'], names['ibs_nid']) try: chips['ibs_gid'] = ut.take(ibs_gid_lookup, chips['hs_gid']) except KeyError: chips['ibs_gid'] = [ibs_gid_lookup.get(index, None) for index in chips['hs_gid']] try: chips['ibs_nid'] = ut.take(ibs_nid_lookup, chips['hs_nid']) except KeyError: chips['ibs_nid'] = [ibs_nid_lookup.get(index, None) for index in chips['hs_nid']] ibs.add_annots( chips['ibs_gid'], bbox_list=chips['bbox'], theta_list=chips['theta'], nid_list=chips['ibs_nid'], notes_list=chips['note'], ) # aid_list = ibs.get_valid_aids() # flag_list = [True] * len(aid_list) # ibs.set_annot_exemplar_flags(aid_list, flag_list) # assert(all(ibs.get_annot_exemplar_flags(aid_list))), 'exemplars not set correctly' # Write file flagging successful conversion with open(join(ibs.get_ibsdir(), SUCCESS_FLAG_FNAME), 'w') as file_: file_.write('Successfully converted hsdir=%r' % (hsdir,)) logger.info('finished ingest') return ibs