# -*- coding: utf-8 -*-
# TODO: find unused functions and kill them
import logging
import copy
import operator
import utool as ut
import vtool as vt
import numpy as np
import itertools as it
from functools import partial, reduce
from wbia.expt import cfghelpers
from wbia.expt import experiment_helpers
print, rrr, profile = ut.inject2(__name__)
logger = logging.getLogger('wbia')
[docs]def build_cmsinfo(cm_list, qreq_):
r"""
Helper function to report results over multiple queries (chip matches).
Basically given a group of queries of the same name, we only care if one of
them is correct. This emulates encounters.
Runs queries of a specific configuration returns the best rank of each
query.
Args:
cm_list (list): list of chip matches
qreq_ (QueryRequest): request that computed the chip matches.
Returns:
dict: cmsinfo - info about multiple chip matches cm_list
CommandLine:
python -m wbia get_query_result_info
python -m wbia get_query_result_info:0 --db lynx \
-a :qsame_imageset=True,been_adjusted=True,excluderef=True -t :K=1
python -m wbia get_query_result_info:0 --db lynx \
-a :qsame_imageset=True,been_adjusted=True,excluderef=True -t :K=1 --cmd
Example:
>>> # ENABLE_DOCTEST
>>> from wbia.expt.test_result import * # NOQA
>>> import wbia
>>> qreq_ = wbia.main_helpers.testdata_qreq_(a=[':qindex=0:3,dindex=0:5'])
>>> cm_list = qreq_.execute()
>>> cmsinfo = build_cmsinfo(cm_list, qreq_)
>>> print(ut.repr2(cmsinfo))
Ignore:
wbia -e rank_cmc --db humpbacks -a :has_any=hasnotch,mingt=2 \
-t :proot=BC_DTW --show --nocache-big
wbia -e rank_cmc --db humpbacks -a :is_known=True,mingt=2 \
-t :pipeline_root=BC_DTW
wbia -e rank_cmc --db humpbacks -a :is_known=True \
-t :pipeline_root=BC_DTW \
--qaid=1,9,15,16,18 --daid-override=1,9,15,16,18,21,22 \
--show --debug-depc
--clear-all-depcache
"""
ibs = qreq_.ibs
qaids = qreq_.qaids
daids = qreq_.daids
# Get the groundtruth ranks and accuracy measures
qx2_cminfo = []
for cm in cm_list:
if hasattr(cm, 'extend_results'):
cminfo = cm.extend_results(qreq_).summarize(qreq_)
else:
cminfo = cm.summarize(qreq_)
qx2_cminfo.append(cminfo)
cmsinfo = ut.dict_stack(qx2_cminfo, 'qx2_')
cmsinfo['qx2_gt_rank'] = ut.replace_nones(cmsinfo['qx2_gt_rank'], -1)
if False:
qx2_gtaids = ibs.get_annot_groundtruth(qaids, daid_list=daids)
qx2_avepercision = np.array(
[
cm.get_average_percision(ibs=ibs, gt_aids=gt_aids)
for (cm, gt_aids) in zip(cm_list, qx2_gtaids)
]
)
cmsinfo['qx2_avepercision'] = qx2_avepercision
# Compute mAP score # TODO: use mAP score
# (Actually map score doesn't make much sense if using name scoring
# mAP = qx2_avepercision[~np.isnan(qx2_avepercision)].mean() # NOQA
qaids = qreq_.qaids
# qaids2 = [cm.qaid for cm in cm_list]
# qnids = qreq_.get_qreq_annot_nids(qaids) # TODO: use new nid getter
qnids = ibs.get_annot_nids(qaids)
unique_dnids = np.unique(ibs.get_annot_nids(qreq_.daids))
unique_qnids, groupxs = ut.group_indices(qnids)
cm_group_list = ut.apply_grouping(cm_list, groupxs)
qnid2_aggnamescores = {}
qnx2_nameres_info = []
# Ranked list aggregation over groups of query annots
nameres_info_list = []
for qnid, cm_group in zip(unique_qnids, cm_group_list):
nid2_name_score_group = [
dict([(nid, cm.name_score_list[nidx]) for nid, nidx in cm.nid2_nidx.items()])
for cm in cm_group
]
aligned_name_scores = np.array(
[
ut.dict_take(nid_to_name_score, unique_dnids.tolist(), -np.inf)
for nid_to_name_score in nid2_name_score_group
]
).T
name_score_list = np.nanmax(aligned_name_scores, axis=1)
qnid2_aggnamescores[qnid] = name_score_list
# sort
sortx = name_score_list.argsort()[::-1]
sorted_namescores = name_score_list[sortx]
sorted_dnids = unique_dnids[sortx]
# infer agg name results
success = sorted_dnids == qnid
failure = np.logical_and(~success, sorted_dnids > 0)
gt_name_rank = None if not np.any(success) else np.where(success)[0][0]
gf_name_rank = None if not np.any(failure) else np.nonzero(failure)[0][0]
gt_nid = sorted_dnids[gt_name_rank]
gf_nid = sorted_dnids[gf_name_rank]
gt_name_score = sorted_namescores[gt_name_rank]
gf_name_score = sorted_namescores[gf_name_rank]
if gt_name_score <= 0:
# ensure failure cases are loud give them the worst possible rank
# instead of a random one.
if hasattr(qreq_, 'dnids'):
gt_name_rank = len(qreq_.dnids) + 1
else:
dnids = list(set(ibs.get_annot_nids(qreq_.daids)))
gt_name_rank = len(dnids) + 1
qnx2_nameres_info = {}
qnx2_nameres_info['qnid'] = qnid
qnx2_nameres_info['gt_nid'] = gt_nid
qnx2_nameres_info['gf_nid'] = gf_nid
qnx2_nameres_info['gt_name_rank'] = gt_name_rank
qnx2_nameres_info['gf_name_rank'] = gf_name_rank
qnx2_nameres_info['gt_name_score'] = gt_name_score
qnx2_nameres_info['gf_name_score'] = gf_name_score
nameres_info_list.append(qnx2_nameres_info)
nameres_info = ut.dict_stack(nameres_info_list, 'qnx2_')
cmsinfo.update(nameres_info)
return cmsinfo
[docs]def combine_testres_list(ibs, testres_list):
"""
combine test results over multiple annot configs
The combination of pipeline and annotation config is indexed by cfgx.
A cfgx corresponds to a unique query request
CommandLine:
python -m wbia --tf combine_testres_list
python -m wbia --tf -draw_rank_cmc --db PZ_MTEST --show
python -m wbia --tf -draw_rank_cmc --db PZ_Master1 --show
python -m wbia --tf -draw_rank_cmc --db PZ_MTEST --show -a varysize -t default
python -m wbia --tf -draw_rank_cmc --db PZ_MTEST --show -a varysize -t default
>>> # DISABLE_DOCTEST
>>> from wbia.expt.test_result import * # NOQA
>>> from wbia.expt import harness
>>> ibs, testres = harness.testdata_expts('PZ_MTEST', ['varysize'])
"""
import copy
from wbia.expt import annotation_configs
acfg_list = [tr.acfg for tr in testres_list]
acfg_lbl_list = annotation_configs.get_varied_acfg_labels(acfg_list)
flat_acfg_list = annotation_configs.flatten_acfg_list(acfg_list)
nonvaried_acfg, varied_acfg_list = ut.partition_varied_cfg_list(flat_acfg_list)
def combine_lbls(lbl, acfg_lbl):
if len(lbl) == 0:
return acfg_lbl
if len(acfg_lbl) == 0:
return lbl
return lbl + '+' + acfg_lbl
# TODO: depcirate cfg_dict list for pcfg_list (I think)
agg_cfg_list = ut.flatten([tr.cfg_list for tr in testres_list])
agg_cfgx2_qreq_ = ut.flatten([tr.cfgx2_qreq_ for tr in testres_list])
agg_cfgdict_list = ut.flatten([tr.cfgdict_list for tr in testres_list])
agg_cfgx2_cmsinfo = ut.flatten([tr.cfgx2_cmsinfo for tr in testres_list])
agg_varied_acfg_list = ut.flatten(
[[acfg] * len(tr.cfg_list) for tr, acfg in zip(testres_list, varied_acfg_list)]
)
agg_cfgx2_lbls = ut.flatten(
[
[combine_lbls(lbl, acfg_lbl) for lbl in tr.cfgx2_lbl]
for tr, acfg_lbl in zip(testres_list, acfg_lbl_list)
]
)
agg_cfgx2_acfg = ut.flatten(
[
[copy.deepcopy(acfg)] * len(tr.cfg_list)
for tr, acfg in zip(testres_list, acfg_list)
]
)
big_testres = TestResult(
agg_cfg_list, agg_cfgx2_lbls, agg_cfgx2_cmsinfo, agg_cfgx2_qreq_
)
# Give the big test result an acfg that is common between everything
big_testres.acfg = annotation_configs.unflatten_acfgdict(nonvaried_acfg)
# TODO: cfgdict_list -> pcfg_list
big_testres.cfgdict_list = agg_cfgdict_list # TODO: depricate
big_testres.common_acfg = annotation_configs.compress_aidcfg(big_testres.acfg)
big_testres.common_cfgdict = reduce(ut.dict_intersection, big_testres.cfgdict_list)
big_testres.varied_acfg_list = agg_varied_acfg_list
big_testres.nonvaried_acfg = nonvaried_acfg
big_testres.varied_cfg_list = [
ut.delete_dict_keys(cfgdict.copy(), list(big_testres.common_cfgdict.keys()))
for cfgdict in big_testres.cfgdict_list
]
big_testres.acfg_list = acfg_list
big_testres.cfgx2_acfg = agg_cfgx2_acfg
big_testres.cfgx2_pcfg = agg_cfgdict_list
assert len(agg_cfgdict_list) == len(agg_cfgx2_acfg)
# big_testres.acfg
testres = big_testres
# big_testres = testres
return testres
[docs]@ut.reloadable_class
class TestResult(ut.NiceRepr):
"""
CommandLine:
export SMK_PIPE="smk:nwords=[64000],sv=[False]"
wbia TestResult --db PZ_MTEST -a ctrl -p $SMK_PIPE
wbia TestResult --db Oxford -a oxford -p $SMK_PIPE
Example:
>>> # Script
>>> from wbia.init import main_helpers
>>> import utool as ut
>>> ibs, testres = main_helpers.testdata_expts()
>>> testres.help()
>>> actions = testres.get_actions()
>>> testres.map_score()
>>> ut.qtensure()
>>> prompt = ut.InteractivePrompt(actions)
>>> prompt.loop()
"""
def __init__(testres, cfg_list, cfgx2_lbl, cfgx2_cmsinfo, cfgx2_qreq_):
assert len(cfg_list) == len(cfgx2_lbl), 'bad lengths1: %r != %r' % (
len(cfg_list),
len(cfgx2_lbl),
)
assert len(cfgx2_qreq_) == len(cfgx2_lbl), 'bad lengths2: %r != %r' % (
len(cfgx2_qreq_),
len(cfgx2_lbl),
)
assert len(cfgx2_cmsinfo) == len(cfgx2_lbl), 'bad lengths3: %r != %r' % (
len(cfgx2_cmsinfo),
len(cfgx2_lbl),
)
# TODO rename cfg_list to pcfg_list
testres.cfg_list = cfg_list
testres.cfgx2_lbl = cfgx2_lbl
testres.cfgx2_cmsinfo = cfgx2_cmsinfo
testres.cfgx2_qreq_ = cfgx2_qreq_
# TODO: uncomment
# testres.cfgx2_acfg
# testres.cfgx2_qcfg
# testres.acfg_list = None #
testres.lbl = None
testres.testnameid = None
[docs] @classmethod
def from_cms(cls, cm_list, qreq_):
cfg_list = [qreq_.qparams] # should actually be the specified dict
cfgx2_lbl = ['unspecified']
cmsinfo = build_cmsinfo(cm_list, qreq_)
cfgx2_cmsinfo = [cmsinfo]
cfgx2_qreq_ = [qreq_]
testres = cls(cfg_list, cfgx2_lbl, cfgx2_cmsinfo, cfgx2_qreq_)
return testres
def __str__(testres):
return testres.reconstruct_test_flags()
# def __repr__(testres):
# return testres._custom_str()
def __nice__(testres):
dbname = None if testres.ibs is None else testres.ibs.get_dbname()
# hashkw = dict(_new=True, pathsafe=False)
infostr_ = 'nCfg=%s' % testres.nConfig
if testres.nConfig == 1:
qreq_ = testres.cfgx2_qreq_[0]
infostr_ += ' nQ=%s, nD=%s %s' % (
len(qreq_.qaids),
len(qreq_.daids),
qreq_.get_pipe_hashid(),
)
# nD=%s %s' % (, len(testres.daids), testres.get_pipe_hashid())
nice = '%s %s' % (dbname, infostr_)
return nice
@property
def ibs(testres):
ibs_list = []
for qreq_ in testres.cfgx2_qreq_:
try:
ibs_list.append(qreq_.ibs)
except AttributeError:
ibs_list.append(qreq_.depc.controller)
ibs = ibs_list[0]
for ibs_ in ibs_list:
assert (
ibs.get_dbdir() == ibs_.get_dbdir()
), 'all requests must use the same database'
return ibs
@property
def qaids(testres):
assert (
testres.has_constant_qaids()
), 'must have constant qaids to use this property'
return testres.cfgx2_qaids[0]
# return testres._qaids
@property
def nConfig(testres):
# FIXME: this is the number of requests not the number of
# pipeline configurations
return len(testres.cfg_list)
@property
def unique_pcfgs(testres):
unique_idxs = ut.unique_indices(map(id, testres.cfgx2_pcfg))
return ut.take(testres.cfgx2_pcfg, unique_idxs)
@property
def nQuery(testres):
return len(testres.qaids)
@property
def rank_mat(testres):
return testres.get_rank_mat()
@property
def cfgx2_daids(testres):
daids_list = [qreq_.daids for qreq_ in testres.cfgx2_qreq_]
return daids_list
@property
def cfgx2_qaids(testres):
qaids_list = [qreq_.qaids for qreq_ in testres.cfgx2_qreq_]
return qaids_list
[docs] def has_constant_daids(testres):
return ut.allsame(testres.cfgx2_daids)
[docs] def has_constant_qaids(testres):
return ut.allsame(testres.cfgx2_qaids)
[docs] def has_constant_length_daids(testres):
return ut.allsame(list(map(len, testres.cfgx2_daids)))
[docs] def has_constant_length_qaids(testres):
return ut.allsame(list(map(len, testres.cfgx2_qaids)))
[docs] def get_infoprop_list(testres, key, qaids=None):
"""
key = 'qx2_gt_rank'
key = 'qx2_gt_rank'
qaids = testres.get_test_qaids()
"""
if key == 'participant':
# Get if qaids are part of the config
cfgx2_infoprop = [np.in1d(qaids, aids_) for aids_ in testres.cfgx2_qaids]
else:
_tmp1_cfgx2_infoprop = ut.get_list_column(testres.cfgx2_cmsinfo, key)
_tmp2_cfgx2_infoprop = list(
map(np.array, ut.util_list.replace_nones(_tmp1_cfgx2_infoprop, np.nan))
)
if qaids is None:
cfgx2_infoprop = _tmp2_cfgx2_infoprop
else:
# Use nan if the aid doesnt exist
cfgx2_qaid2_qx = [
dict(zip(aids_, range(len(aids_)))) for aids_ in testres.cfgx2_qaids
]
qxs_list = [
ut.dict_take(qaid2_qx, qaids, None) for qaid2_qx in cfgx2_qaid2_qx
]
cfgx2_infoprop = [
[np.nan if x is None else props[x] for x in qxs]
for props, qxs in zip(_tmp2_cfgx2_infoprop, qxs_list)
]
if key == 'qx2_gt_rank' or key.endswith('_rank'):
# hack
wpr = testres.get_worst_possible_rank()
cfgx2_infoprop = [
np.array([wpr if rank == -1 else rank for rank in infoprop])
for infoprop in cfgx2_infoprop
]
return cfgx2_infoprop
[docs] def get_infoprop_mat(testres, key, qaids=None):
"""
key = 'qx2_gf_raw_score'
key = 'qx2_gt_raw_score'
"""
cfgx2_infoprop = testres.get_infoprop_list(key, qaids)
# concatenate each query rank across configs
infoprop_mat = np.vstack(cfgx2_infoprop).T
return infoprop_mat
[docs] @ut.memoize
def get_rank_mat(testres, qaids=None):
# Ranks of Best Results
rank_mat = testres.get_infoprop_mat(key='qx2_gt_rank', qaids=qaids)
return rank_mat
[docs] def get_worst_possible_rank(testres):
# worst_possible_rank = max(9001, len(testres.daids) + 1)
worst_possible_rank = max([len(qreq_.daids) for qreq_ in testres.cfgx2_qreq_]) + 1
# worst_possible_rank = len(testres.daids) + 1
return worst_possible_rank
[docs] def get_rank_histograms(testres, bins=None, key=None, join_acfgs=False):
"""
Ignore:
testres.get_infoprop_mat('qnx2_gt_name_rank')
testres.get_infoprop_mat('qnx2_gf_name_rank')
testres.get_infoprop_mat('qnx2_qnid')
Example:
>>> # DISABLE_DOCTEST
>>> from wbia.expt.test_result import * # NOQA
>>> from wbia.init import main_helpers
>>> ibs, testres = main_helpers.testdata_expts('testdb1', a=['default'])
>>> bins = 'dense'
>>> key = 'qnx2_gt_name_rank'
>>> config_hists = testres.get_rank_histograms(bins, key=key)
"""
if key is None:
key = 'qx2_gt_rank'
# key = 'qnx2_gt_name_rank'
if bins is None:
bins = testres.get_rank_histogram_bins()
elif bins == 'dense':
bins = np.arange(testres.get_worst_possible_rank() + 1)
cfgx2_ranks = testres.get_infoprop_list(key=key)
# Use numpy histogram repr
cfgx2_hist = np.zeros((len(cfgx2_ranks), len(bins) - 1), dtype=np.int32)
for cfgx, ranks in enumerate(cfgx2_ranks):
freq = np.histogram(ranks, bins=bins)[0]
cfgx2_hist[cfgx] = freq
if join_acfgs:
# Hack for turtles / general way of doing cross validation
# however, we need to change the name
groupxs = testres.get_cfgx_groupxs()
cfgx2_hist = np.array(
[
np.sum(group, axis=0)
for group in ut.apply_grouping(cfgx2_hist, groupxs)
]
)
return cfgx2_hist, bins
[docs] def get_rank_percentage_cumhist(testres, bins='dense', key=None, join_acfgs=False):
r"""
Args:
bins (unicode): (default = u'dense')
key (None): (default = None)
join_acfgs (bool): (default = False)
Returns:
tuple: (config_cdfs, edges)
CommandLine:
python -m wbia --tf TestResult.get_rank_percentage_cumhist
python -m wbia --tf TestResult.get_rank_percentage_cumhist \
-t baseline -a unctrl ctrl
python -m wbia --tf TestResult.get_rank_percentage_cumhist \
--db lynx \
-a default:qsame_imageset=True,been_adjusted=True,excluderef=True \
-t default:K=1 --show --cmd
Example:
>>> # DISABLE_DOCTEST
>>> from wbia.expt.test_result import * # NOQA
>>> from wbia.init import main_helpers
>>> ibs, testres = main_helpers.testdata_expts(
>>> 'testdb1', a=['default:num_names=1,name_offset=[0,1]'])
>>> bins = u'dense'
>>> key = None
>>> (config_cdfs, edges) = testres.get_rank_percentage_cumhist(bins)
>>> result = ('(config_cdfs, edges) = %s' % (str((config_cdfs, edges)),))
>>> print(result)
"""
cfgx2_hist, edges = testres.get_rank_histograms(
bins, key=key, join_acfgs=join_acfgs
)
cfgx2_cumhist = np.cumsum(cfgx2_hist, axis=1)
cfgx2_cumhist_percent = 100 * cfgx2_cumhist / cfgx2_cumhist.T[-1].T[:, None]
return cfgx2_cumhist_percent, edges
[docs] def get_cfgx_groupxs(testres):
r"""
Returns the group indices of configurations specified to be joined.
Ignore:
a = [
'default:minqual=good,require_timestamp=True,view=left,crossval_enc=True,joinme=1',
'default:minqual=good,require_timestamp=True,view=right,crossval_enc=True,joinme=1',
'default:minqual=ok,require_timestamp=True,view=left,crossval_enc=True,joinme=2',
'default:minqual=ok,require_timestamp=True,view=right,crossval_enc=True,joinme=2',
]
>>> a = [
>>> 'default:minqual=good,require_timestamp=True,view=left,crossval_enc=True,joinme=1',
>>> 'default:minqual=good,require_timestamp=True,view=right,crossval_enc=True,joinme=1',
>>> 'default:minqual=ok,require_timestamp=True,view=left,crossval_enc=True,joinme=2',
>>> 'default:minqual=ok,require_timestamp=True,view=right,crossval_enc=True,joinme=2',
>>> ]
>>> from wbia.init import main_helpers
>>> #a = 'default:minqual=good,require_timestamp=True,crossval_enc=True,view=[right,left]'
>>> t = 'default:K=[1]'
>>> ibs, testres = main_helpers.testdata_expts('WWF_Lynx_Copy', a=a, t=t)
>>> testres.get_cfgx_groupxs()
ut.lmap(sum, ut.apply_grouping([len(ut.unique(ibs.annots(aids).nids)) for aids in testres.cfgx2_qaids], testres.get_cfgx_groupxs()))
ut.lmap(sum, ut.apply_grouping([len(ut.unique(ibs.annots(aids))) for aids in testres.cfgx2_qaids], testres.get_cfgx_groupxs()))
Example:
>>> # xdoctest: +REQUIRES(--slow)
>>> # ENABLE_DOCTEST
>>> from wbia.expt.test_result import * # NOQA
>>> from wbia.init import main_helpers
>>> ibs, testres = main_helpers.testdata_expts(
>>> 'PZ_MTEST',
>>> a=['default:qnum_names=1,qname_offset=[0,1],joinme=1,dpername=1',
>>> 'default:qsize=1,dpername=[1,2]'],
>>> t=['default:K=[1,2]'])
>>> groupxs = testres.get_cfgx_groupxs()
>>> result = groupxs
>>> print(result)
[[6], [4], [0, 2], [7], [5], [1, 3]]
"""
# Group-ids for annotations are determined by joinme labels
# (used primarilly in cross-validation)
acfg_joinid = [acfg['qcfg']['joinme'] for acfg in testres.cfgx2_acfg]
# Anything that does not have a joinme groupid is standalone and must
# be given a unique groupid
gen_groupid = it.count(-1, step=-1)
acfg_groupids = [
next(gen_groupid) if grpid is None else grpid for grpid in acfg_joinid
]
# Ensure that different pipeline configs are in different groups
pcfg_groupids = ut.get_varied_cfg_lbls(testres.cfgx2_pcfg)
cfg_groupids = list(zip(pcfg_groupids, acfg_groupids))
groupxs = ut.group_indices(cfg_groupids)[1]
return groupxs
[docs] def get_rank_histogram_bins(testres):
"""easy to see histogram bins"""
worst_possible_rank = testres.get_worst_possible_rank()
if worst_possible_rank > 50:
bins = [0, 1, 5, 50, worst_possible_rank, worst_possible_rank + 1]
elif worst_possible_rank > 5:
bins = [0, 1, 5, worst_possible_rank, worst_possible_rank + 1]
else:
bins = [0, 1, 5]
return bins
[docs] def get_X_LIST(testres):
"""DEPRICATE or refactor"""
# X_LIST = ut.get_argval('--rank-lt-list', type_=list, default=[1])
X_LIST = ut.get_argval('--rank-lt-list', type_=list, default=[1, 5])
return X_LIST
[docs] def get_nLessX_dict(testres):
"""
Build a (histogram) dictionary mapping X (as in #ranks < X) to a list
of cfg scores
"""
X_LIST = testres.get_X_LIST()
nLessX_dict = {int(X): np.zeros(testres.nConfig) for X in X_LIST}
cfgx2_qx2_gt_rank = testres.get_infoprop_list('qx2_gt_rank')
for X in X_LIST:
cfgx2_lessX_mask = [
np.logical_and(0 <= qx2_gt_ranks, qx2_gt_ranks < X)
for qx2_gt_ranks in cfgx2_qx2_gt_rank
]
cfgx2_nLessX = np.array([lessX_.sum(axis=0) for lessX_ in cfgx2_lessX_mask])
nLessX_dict[int(X)] = cfgx2_nLessX
return nLessX_dict
[docs] def get_all_varied_params(testres):
r"""
Returns the parameters that were varied between different
configurations in this test
Returns:
list: varied_params
CommandLine:
python -m wbia TestResult.get_all_varied_params
Example:
>>> # ENABLE_DOCTEST
>>> from wbia.expt.test_result import * # NOQA
>>> import wbia
>>> testres = wbia.testdata_expts(
>>> 'PZ_MTEST', t='default:K=[1,2]')[1]
>>> varied_params = sorted(testres.get_all_varied_params())
>>> result = ('varied_params = %s' % (ut.repr2(varied_params),))
>>> print(result)
varied_params = ['K', '_cfgindex']
"""
# only for big results
varied_cfg_params = list(
set(ut.flatten([cfgdict.keys() for cfgdict in testres.varied_cfg_list]))
)
varied_acfg_params = list(
set(ut.flatten([acfg.keys() for acfg in testres.varied_acfg_list]))
)
varied_params = varied_acfg_params + varied_cfg_params
return varied_params
[docs] def get_total_num_varied_params(testres):
return len(testres.get_all_varied_params())
[docs] def get_param_basis(testres, key):
"""
Returns what a param was varied between over all tests
key = 'K'
key = 'dcfg_sample_size'
"""
if key == 'len(daids)':
basis = sorted(list(set([len(daids) for daids in testres.cfgx2_daids])))
elif any([key in cfgdict for cfgdict in testres.varied_cfg_list]):
basis = sorted(
list(set([cfgdict[key] for cfgdict in testres.varied_cfg_list]))
)
elif any([key in cfgdict for cfgdict in testres.varied_acfg_list]):
basis = sorted(list(set([acfg[key] for acfg in testres.varied_acfg_list])))
else:
# assert False, 'param is not varied'
if key in testres.common_cfgdict:
basis = [testres.common_cfgdict[key]]
elif key in testres.nonvaried_acfg:
basis = [testres.nonvaried_acfg[key]]
else:
assert False, 'param=%r doesnt exist' % (key,)
return basis
[docs] def get_param_val_from_cfgx(testres, cfgx, key):
if key == 'len(daids)':
return len(testres.cfgx2_daids[cfgx])
# --- HACK - the keys are different in varied dict for some reason ---
elif any([key in cfgdict for cfgdict in testres.varied_cfg_list]):
return testres.varied_cfg_list[cfgx][key]
elif any([key in cfgdict for cfgdict in testres.varied_acfg_list]):
return testres.varied_acfg_list[cfgx][key]
# --- / Hack
elif any([key in cfgdict for cfgdict in testres.cfgx2_pcfg]):
return testres.cfgx2_pcfg[cfgx][key]
elif any([key in cfgdict for cfgdict in testres.cfgx2_acfg]):
return testres.cfgx2_acfg[cfgx][key]
else:
assert False, 'param=%r doesnt exist' % (key,)
[docs] def get_cfgx_with_param(testres, key, val):
"""
Gets configs where the given parameter is held constant
"""
if key == 'len(daids)':
cfgx_list = [
cfgx
for cfgx, daids in enumerate(testres.cfgx2_daids)
if len(daids) == val
]
elif any([key in cfgdict for cfgdict in testres.varied_cfg_list]):
cfgx_list = [
cfgx
for cfgx, cfgdict in enumerate(testres.varied_cfg_list)
if cfgdict[key] == val
]
elif any([key in cfgdict for cfgdict in testres.varied_acfg_list]):
cfgx_list = [
cfgx
for cfgx, acfg in enumerate(testres.varied_acfg_list)
if acfg[key] == val
]
else:
if key in testres.common_cfgdict:
cfgx_list = list(range(testres.nConfig))
elif key in testres.nonvaried_acfg:
cfgx_list = list(range(testres.nConfig))
else:
assert False, 'param=%r doesnt exist' % (key,)
# assert False, 'param is not varied'
return cfgx_list
[docs] def get_pipecfg_args(testres):
if '_cfgstr' in testres.common_cfgdict:
pipecfg_args = [testres.common_cfgdict['_cfgstr']]
else:
pipecfg_args = ut.unique_ordered(
[cfg['_cfgstr'] for cfg in testres.varied_cfg_list]
)
return ' '.join(pipecfg_args)
[docs] def get_annotcfg_args(testres):
"""
CommandLine:
# TODO: More robust fix
# To reproduce the error
wbia -e rank_cmc --db humpbacks_fb -a default:mingt=2,qsize=10,dsize=100 default:qmingt=2,qsize=10,dsize=100 -t default:proot=BC_DTW,decision=max,crop_dim_size=500,crop_enabled=True,manual_extract=False,use_te_scorer=True,ignore_notch=True,te_score_weight=0.5 --show
"""
if '_cfgstr' in testres.common_acfg['common']:
annotcfg_args = [testres.common_acfg['common']['_cfgstr']]
else:
try:
annotcfg_args = ut.unique_ordered(
[acfg['common']['_cfgstr'] for acfg in testres.varied_acfg_list]
)
except KeyError:
# HACK FIX
try:
annotcfg_args = ut.unique_ordered(
[acfg['_cfgstr'] for acfg in testres.varied_acfg_list]
)
except KeyError:
annotcfg_args = ut.unique_ordered(
[acfg['qcfg__cfgstr'] for acfg in testres.varied_acfg_list]
)
return ' '.join(annotcfg_args)
[docs] def reconstruct_test_flags(testres):
flagstr = ' '.join(
[
'-a ' + testres.get_annotcfg_args(),
'-t ' + testres.get_pipecfg_args(),
'--db ' + testres.ibs.get_dbname(),
]
)
return flagstr
[docs] def get_full_cfgstr(testres, cfgx):
"""both qannots and dannots included"""
full_cfgstr = testres.cfgx2_qreq_[cfgx].get_full_cfgstr()
return full_cfgstr
[docs] @ut.memoize
def get_cfgstr(testres, cfgx):
"""just dannots and config_str"""
cfgstr = testres.cfgx2_qreq_[cfgx].get_cfgstr()
return cfgstr
def _shorten_lbls(testres, lbl):
"""
hacky function
"""
import re
repl_list = [
('candidacy_', ''),
('viewpoint_compare', 'viewpoint'),
# ('custom', 'default'),
# ('fg_on', 'FG'),
# ('fg_on=True', 'FG'),
# ('fg_on=False,?', ''),
('fg_on=True', 'FG=True'),
('fg_on=False,?', 'FG=False'),
('lnbnn_on=True', 'LNBNN'),
('lnbnn_on=False,?', ''),
('normonly_on=True', 'normonly'),
('normonly_on=False,?', ''),
('bar_l2_on=True', 'dist'),
('bar_l2_on=False,?', ''),
(r'joinme=\d+,?', ''),
('dcrossval_enc', 'denc_per_name'),
('sv_on', 'SV'),
('rotation_invariance', 'RI'),
('affine_invariance', 'AI'),
('query_rotation_heuristic', 'QRH'),
('nNameShortlistSVER', 'nRR'),
#
# ('sample_per_ref_name', 'per_ref_name'),
('sample_per_ref_name', 'per_gt_name'),
('require_timestamp=True', 'require_timestamp'),
('require_timestamp=False,?', ''),
('require_timestamp=None,?', ''),
('[_A-Za-z]*=None,?', ''),
('dpername=None,?', ''),
# ???
# ('sample_per_ref_name', 'per_gt_name'),
# ('per_name', 'per_gf_name'), # Try to make labels clearer for paper
# ----
# ('prescore_method=\'?csum\'?,score_method=\'?csum\'?,?', 'amech'),
# ('prescore_method=\'?nsum\'?,score_method=\'?nsum\'?,?', 'fmech'),
("prescore_method='?csum'?,score_method='?csum'?,?", 'mech=annot'),
("prescore_method='?nsum'?,score_method='?nsum'?,?", 'mech=name'),
('force_const_size=[^,]+,?', ''),
(r'[dq]?_true_size=\d+,?', ''),
(r'[dq]?_orig_size=[^,]+,?', ''),
# Hack
(
'[qd]?exclude_reference='
+ ut.regex_or(['True', 'False', 'None'])
+ r'\,?',
'',
),
# ('=True', '=On'),
# ('=False', '=Off'),
('=True', '=T'),
('=False', '=F'),
(',$', ''),
]
for ser, rep in repl_list:
lbl = re.sub(ser, rep, lbl)
return lbl
[docs] def get_short_cfglbls(testres, join_acfgs=False):
"""
Labels for published tables
cfg_lbls = ['baseline:nRR=200+default:', 'baseline:+default:']
CommandLine:
python -m wbia --tf TestResult.get_short_cfglbls
Example:
>>> # SLOW_DOCTEST
>>> from wbia.expt.test_result import * # NOQA
>>> import wbia
>>> ibs, testres = wbia.testdata_expts('PZ_MTEST', a=['ctrl:size=10'],
>>> t=['default:dim_size=[450,550]'])
>>> cfg_lbls = testres.get_short_cfglbls()
>>> result = ('cfg_lbls = %s' % (ut.repr2(cfg_lbls),))
>>> print(result)
cfg_lbls = [
'default:dim_size=450+ctrl',
'default:dim_size=550+ctrl',
]
"""
from wbia.expt import annotation_configs
if False:
acfg_names = [acfg['qcfg']['_cfgstr'] for acfg in testres.cfgx2_acfg]
pcfg_names = [pcfg['_cfgstr'] for pcfg in testres.cfgx2_pcfg]
# Only vary the label settings within the cfgname
acfg_hashes = np.array(list(map(hash, acfg_names)))
unique_hashes, a_groupxs = vt.group_indices(acfg_hashes)
a_label_groups = []
for groupx in a_groupxs:
acfg_list = ut.take(testres.cfgx2_acfg, groupx)
varied_lbls = annotation_configs.get_varied_acfg_labels(
acfg_list, mainkey='_cfgstr'
)
a_label_groups.append(varied_lbls)
acfg_lbls = vt.invert_apply_grouping(a_label_groups, a_groupxs)
pcfg_hashes = np.array(list(map(hash, pcfg_names)))
unique_hashes, p_groupxs = vt.group_indices(pcfg_hashes)
p_label_groups = []
for groupx in p_groupxs:
pcfg_list = ut.take(testres.cfgx2_pcfg, groupx)
varied_lbls = ut.get_varied_cfg_lbls(pcfg_list, mainkey='_cfgstr')
p_label_groups.append(varied_lbls)
pcfg_lbls = vt.invert_apply_grouping(p_label_groups, p_groupxs)
cfg_lbls = [albl + '+' + plbl for albl, plbl in zip(acfg_lbls, pcfg_lbls)]
else:
cfg_lbls_ = testres.cfgx2_lbl[:]
cfg_lbls_ = [testres._shorten_lbls(lbl) for lbl in cfg_lbls_]
# split configs up by param and annots
pa_tups = [lbl.split('+') for lbl in cfg_lbls_]
cfg_lbls = []
for pa in pa_tups:
new_parts = []
for part in pa:
_tup = part.split(ut.NAMEVARSEP)
name, settings = _tup if len(_tup) > 1 else (_tup[0], '')
new_parts.append(part if settings else name)
if len(new_parts) == 2 and new_parts[1] == 'default':
newlbl = new_parts[0]
else:
newlbl = '+'.join(new_parts)
cfg_lbls.append(newlbl)
if join_acfgs:
groupxs = testres.get_cfgx_groupxs()
group_lbls = []
for group in ut.apply_grouping(cfg_lbls, groupxs):
num_parts = 0
part_dicts = []
for lbl in group:
parts = []
for count, pa in enumerate(lbl.split('+')):
num_parts = max(num_parts, count + 1)
cfgdict = cfghelpers.parse_cfgstr_list2([pa], strict=False)[0][0]
parts.append(cfgdict)
part_dicts.append(parts)
group_lbl_parts = []
for px in range(num_parts):
cfgs = ut.take_column(part_dicts, px)
nonvaried_cfg = ut.partition_varied_cfg_list(cfgs)[0]
group_lbl_parts.append(ut.get_cfg_lbl(nonvaried_cfg))
# logger.info('nonvaried_lbl = %r' % (nonvaried_lbl,))
group_lbl = '+'.join(group_lbl_parts)
group_lbls.append(group_lbl)
cfg_lbls = group_lbls
return cfg_lbls
[docs] def get_varied_labels(testres, shorten=False, join_acfgs=False, sep=''):
"""
Returns labels indicating only the parameters that have been varied between
different annot/pipeline configurations.
Helper for consistent figure titles
CommandLine:
python -m wbia --tf TestResult.make_figtitle --prefix "Seperability " --db GIRM_Master1 -a timectrl -t Ell:K=2 --hargv=scores
python -m wbia --tf TestResult.make_figtitle
python -m wbia TestResult.get_varied_labels
Example:
>>> # SLOW_DOCTEST
>>> from wbia.expt.test_result import * # NOQA
>>> import wbia
>>> ibs, testres = wbia.testdata_expts(
>>> 'PZ_MTEST', t='default:K=[1,2]',
>>> #a=['timectrl:qsize=[1,2],dsize=[3,4]']
>>> a=[
>>> 'default:qsize=[1,2],dsize=2,joinme=1,view=left',
>>> 'default:qsize=2,dsize=3,joinme=1,view=primary',
>>> 'default:qsize=[3,2],dsize=4,joinme=2,view=left',
>>> 'default:qsize=4,dsize=5,joinme=2,view=primary',
>>> ]
>>> )
>>> # >>> ibs, testres = wbia.testdata_expts(
>>> # >>> 'WWF_Lynx_Copy', t='default:K=1',
>>> # >>> a=[
>>> # >>> 'default:minqual=good,require_timestamp=True,view=left,dcrossval_enc=1,joinme=1',
>>> # >>> 'default:minqual=good,require_timestamp=True,view=left,dcrossval_enc=2,joinme=2',
>>> # >>> #'default:minqual=good,require_timestamp=True,view=left,dcrossval_enc=3,joinme=3',
>>> # >>> 'default:minqual=good,require_timestamp=True,view=right,dcrossval_enc=1,joinme=1',
>>> # >>> 'default:minqual=good,require_timestamp=True,view=right,dcrossval_enc=2,joinme=2',
>>> # >>> #'default:minqual=good,require_timestamp=True,view=right,dcrossval_enc=3,joinme=3',
>>> # >>> ]
>>> # >>> )
>>> varied_lbls = testres.get_varied_labels(shorten=False, join_acfgs=True)
>>> result = ('varied_lbls = %s' % (ut.repr2(varied_lbls, strvals=True, nl=2),))
>>> print(result)
varied_lbls = [u'K=1+qsize=1', u'K=2+qsize=1', u'K=1+qsize=2', u'K=2+qsize=2']
"""
from wbia.expt import annotation_configs
varied_acfgs = annotation_configs.get_varied_acfg_labels(
testres.cfgx2_acfg, checkname=True
)
# logger.info('varied_acfgs = %s' % (ut.repr2(varied_acfgs, nl=2),))
# logger.info('testres.cfgx2_acfg = %s' % (ut.repr3(testres.cfgx2_acfg),))
varied_pcfgs = ut.get_varied_cfg_lbls(testres.cfgx2_pcfg, checkname=True)
# varied_acfgs = ut.get_varied_cfg_lbls(testres.cfgx2_acfg, checkname=True)
name_sep = ':'
cfg_sep = '+'
if join_acfgs:
# Hack for the grouped config problem
new_varied_acfgs = []
groupxs = testres.get_cfgx_groupxs()
grouped_acfgs = ut.apply_grouping(varied_acfgs, groupxs)
grouped_pcfgs = ut.apply_grouping(varied_pcfgs, groupxs)
for group in grouped_acfgs:
group = [p if name_sep in p else name_sep + p for p in group]
# Re-parse given back into dictionary form
cfgdicts_ = cfghelpers.parse_cfgstr_list2(group, strict=False)
# I forget why these are stored in a 2d-list
cfgdicts = ut.take_column(cfgdicts_, 0)
new_acfgs = ut.partition_varied_cfg_list(cfgdicts)
# Hack, just taking the first one that has agreement between
# joinme / crossvalidation runs
new_acfg = new_acfgs[0]
if True:
# look at internal variance within xval runs
internal_cfgs = new_acfgs[1]
import pandas as pd
intern_variations = pd.DataFrame.from_dict(internal_cfgs).to_dict(
orient='list'
)
op_prefixes = {
'sum': (np.sum, 'Σ-', ''),
'mean': (np.mean, 'µ-', ''),
'set': (lambda x: '&'.join(set(map(str, x))), '', 's'),
}
known_modes = {
'dsize': 'mean',
'qsize': 'sum',
'view': 'set',
}
for key in intern_variations.keys():
if key.startswith('_'):
continue
mode = known_modes.get(key, None)
vals = intern_variations[key]
if mode is None:
mode = 'set'
if key == 'crossval_idx':
new_acfg['folds'] = len(intern_variations['crossval_idx'])
else:
op, pref, suff = op_prefixes[mode]
c = op(vals)
if isinstance(c, str):
new_acfg[pref + key + suff] = c
else:
new_acfg[pref + key + suff] = ut.repr2(c, precision=2)
# if 'dsize' in intern_variations:
# new_acfg['µ-dsize'] = np.sum(intern_variations['dsize'])
# if 'qsize' in intern_variations:
# new_acfg['Σ-qsize'] = np.sum(intern_variations['qsize'])
# if 'view' in intern_variations:
# new_acfg['views'] = '&'.join(set(intern_variations['view']))
# if 'crossval_idx' in intern_variations:
# new_acfg['folds'] = len(intern_variations['crossval_idx'])
new_varied_acfgs.append(new_acfg)
# Do one more dup check to remove the duplicate summaries
common_new_acfg = ut.partition_varied_cfg_list(new_varied_acfgs)[0]
for key in common_new_acfg.keys():
if not key.startswith('_'):
for new_acfg in new_varied_acfgs:
del new_acfg[key]
varied_pcfgs = ut.take_column(grouped_pcfgs, 0)
varied_acfgs = [
ut.get_cfg_lbl(new_acfg_, with_name=False, sep=sep)
for new_acfg_ in new_varied_acfgs
]
def combo_lbls(lbla, lblp):
parts = []
if lbla != name_sep and lbla:
parts.append(lbla)
if lblp != name_sep and lblp:
parts.append(lblp)
return (sep + cfg_sep).join(parts)
varied_lbls = [
combo_lbls(lbla, lblp) for lblp, lbla in zip(varied_acfgs, varied_pcfgs)
]
if shorten:
varied_lbls = [testres._shorten_lbls(lbl) for lbl in varied_lbls]
return varied_lbls
[docs] def get_sorted_config_labels(testres):
"""
helper
"""
key = 'qx2_gt_rank'
cfgx2_cumhist_percent, edges = testres.get_rank_percentage_cumhist(
bins='dense', key=key
)
label_list = testres.get_short_cfglbls()
label_list = [
('%6.2f%%' % (percent,))
# ut.scalar_str(percent, precision=2)
+ ' - ' + label
for percent, label in zip(cfgx2_cumhist_percent.T[0], label_list)
]
sortx = cfgx2_cumhist_percent.T[0].argsort()[::-1]
label_list = ut.take(label_list, sortx)
return label_list
[docs] def make_figtitle(testres, plotname='', filt_cfg=None):
"""
Helper for consistent figure titles
CommandLine:
python -m wbia --tf TestResult.make_figtitle --prefix "Seperability " --db GIRM_Master1 -a timectrl -t Ell:K=2 --hargv=scores
python -m wbia --tf TestResult.make_figtitle
Example:
>>> # ENABLE_DOCTEST
>>> from wbia.expt.test_result import * # NOQA
>>> import wbia
>>> ibs, testres = wbia.testdata_expts('PZ_MTEST')
>>> plotname = ''
>>> figtitle = testres.make_figtitle(plotname)
>>> result = ('figtitle = %r' % (figtitle,))
>>> print(result)
"""
figtitle_prefix = ut.get_argval('--prefix', type_=str, default='')
if figtitle_prefix != '':
figtitle_prefix = figtitle_prefix.rstrip() + ' '
figtitle = figtitle_prefix + plotname
hasprefix = figtitle_prefix == ''
if hasprefix:
figtitle += '\n'
title_aug = testres.get_title_aug(friendly=True, with_cfg=hasprefix)
figtitle += ' ' + title_aug
if filt_cfg is not None:
filt_cfgstr = ut.get_cfg_lbl(filt_cfg)
if filt_cfgstr.strip() != ':':
figtitle += ' ' + filt_cfgstr
return figtitle
[docs] def get_title_aug(
testres, with_size=True, with_db=True, with_cfg=True, friendly=False
):
r"""
Args:
with_size (bool): (default = True)
Returns:
str: title_aug
CommandLine:
python -m wbia --tf TestResult.get_title_aug --db PZ_Master1 -a timequalctrl::timectrl
Example:
>>> # DISABLE_DOCTEST
>>> from wbia.expt.test_result import * # NOQA
>>> import wbia
>>> ibs, testres = wbia.testdata_expts('PZ_MTEST')
>>> with_size = True
>>> title_aug = testres.get_title_aug(with_size)
>>> res = u'title_aug = %s' % (title_aug,)
>>> print(res)
"""
ibs = testres.ibs
title_aug = ''
if with_db:
title_aug += 'db=' + (ibs.get_dbname())
if with_cfg:
try:
if '_cfgname' in testres.common_acfg['common']:
try:
annot_cfgname = testres.common_acfg['common']['_cfgstr']
except KeyError:
annot_cfgname = testres.common_acfg['common']['_cfgname']
else:
cfgname_list = [
cfg['dcfg__cfgname'] for cfg in testres.varied_acfg_list
]
cfgname_list = ut.unique_ordered(cfgname_list)
annot_cfgname = '[' + ','.join(cfgname_list) + ']'
try:
pipeline_cfgname = testres.common_cfgdict['_cfgstr']
except KeyError:
# pipeline_cfgname = testres.common_cfgdict['_cfgname']
cfgstr_list = [cfg['_cfgstr'] for cfg in testres.varied_cfg_list]
uniuqe_cfgstrs = ut.unique_ordered(cfgstr_list)
pipeline_cfgname = '[' + ','.join(uniuqe_cfgstrs) + ']'
annot_cfgname = testres._shorten_lbls(annot_cfgname)
pipeline_cfgname = testres._shorten_lbls(pipeline_cfgname)
# hack turn these off if too long
if len(annot_cfgname) < 64:
title_aug += ' a=' + annot_cfgname
if len(pipeline_cfgname) < 64:
title_aug += ' t=' + pipeline_cfgname
except Exception as ex:
logger.info(ut.repr2(testres.common_acfg))
logger.info(ut.repr2(testres.common_cfgdict))
ut.printex(ex)
raise
if with_size:
if ut.get_argflag('--hack_size_nl'):
title_aug += '\n'
if testres.has_constant_qaids():
title_aug += ' #qaids=%r' % (len(testres.qaids),)
elif testres.has_constant_length_qaids():
title_aug += ' #qaids=%r*' % (len(testres.cfgx2_qaids[0]),)
if testres.has_constant_daids():
daids = testres.cfgx2_daids[0]
title_aug += ' #daids=%r' % (len(testres.cfgx2_daids[0]),)
if testres.has_constant_qaids():
all_daid_per_name_stats = ut.get_stats(
ibs.get_num_annots_per_name(daids)[0], use_nan=True
)
if all_daid_per_name_stats['std'] == 0:
title_aug += ' dper_name=%s' % (
ut.scalar_str(
all_daid_per_name_stats['mean'], max_precision=2
),
)
else:
title_aug += ' dper_name=%s±%s' % (
ut.scalar_str(all_daid_per_name_stats['mean'], precision=2),
ut.scalar_str(all_daid_per_name_stats['std'], precision=2),
)
elif testres.has_constant_length_daids():
daids = testres.cfgx2_daids[0]
title_aug += ' #daids=%r*' % (len(testres.cfgx2_daids[0]),)
if friendly:
# Hackiness for friendliness
# title_aug = title_aug.replace('db=PZ_Master1', 'Plains Zebras')
# title_aug = title_aug.replace('db=NNP_MasterGIRM_core', 'Masai Giraffes')
# title_aug = title_aug.replace('db=GZ_ALL', 'Grevy\'s Zebras')
title_aug = ut.multi_replace(
title_aug,
list(ibs.const.DBNAME_ALIAS.keys()),
list(ibs.const.DBNAME_ALIAS.values()),
)
# title_aug = title_aug.replace('db=PZ_Master1', 'db=PZ')
# title_aug = title_aug.replace('db=NNP_MasterGIRM_core', 'Masai Giraffes')
# title_aug = title_aug.replace('db=GZ_ALL', 'Grevy\'s Zebras')
return title_aug
[docs] def get_fname_aug(testres, **kwargs):
import re
title_aug = testres.get_title_aug(**kwargs)
valid_regex = '-a-zA-Z0-9_.() '
valid_extra = '=,'
valid_regex += valid_extra
title_aug = title_aug.replace(' ', '_') # spaces suck
fname_aug = re.sub('[^' + valid_regex + ']+', '', title_aug)
fname_aug = fname_aug.strip('_')
return fname_aug
[docs] def print_pcfg_info(testres):
"""
Prints verbose information about each pipeline configuration
>>> from wbia.expt.test_result import * # NOQA
"""
# TODO: Rectify with other printers
# for pcfgx, (pipecfg, lbl) in enumerate(zip(pipecfg_list, pipecfg_lbls)):
# logger.info('+--- %d / %d ===' % (pcfgx, (len(pipecfg_list))))
# ut.colorprint(lbl, 'white')
# logger.info(pipecfg.get_cfgstr())
# logger.info('L___')
# for qreq_ in testres.cfgx2_qreq_:
# logger.info(qreq_.get_full_cfgstr())
# cfgdict_list = [qreq_.qparams for qreq_ in testres.cfgx2_qreq_]
experiment_helpers.print_pipe_configs(testres.cfgx2_pcfg, testres.cfgx2_qreq_)
[docs] def print_acfg_info(testres, **kwargs):
"""
Prints verbose information about the annotations used in each test
configuration
CommandLine:
python -m wbia --tf TestResult.print_acfg_info
Kwargs:
see ibs.get_annot_stats_dict
hashid, per_name, per_qual, per_vp, per_name_vpedge, per_image,
min_name_hourdist
Example:
>>> # DISABLE_DOCTEST
>>> from wbia.expt.test_result import * # NOQA
>>> import wbia
>>> ibs, testres = wbia.testdata_expts('PZ_MTEST',
>>> a=['ctrl::unctrl_comp'],
>>> t=['candk:K=[1,2]'])
>>> ibs = None
>>> result = testres.print_acfg_info()
>>> print(result)
"""
from wbia.expt import annotation_configs
ibs = testres.ibs
# Get unique annotation configs
cfgx2_acfg_label = annotation_configs.get_varied_acfg_labels(testres.cfgx2_acfg)
flags = ut.flag_unique_items(cfgx2_acfg_label)
qreq_list = ut.compress(testres.cfgx2_qreq_, flags)
acfg_list = ut.compress(testres.cfgx2_acfg, flags)
expanded_aids_list = [(qreq_.qaids, qreq_.daids) for qreq_ in qreq_list]
annotation_configs.print_acfg_list(acfg_list, expanded_aids_list, ibs, **kwargs)
[docs] def print_unique_annot_config_stats(testres, ibs=None):
r"""
Args:
ibs (IBEISController): wbia controller object(default = None)
CommandLine:
python -m wbia TestResult.print_unique_annot_config_stats
Example:
>>> # DISABLE_DOCTEST
>>> from wbia.expt.test_result import * # NOQA
>>> import wbia
>>> testres = wbia.testdata_expts('PZ_MTEST', a=['ctrl::unctrl_comp'])
>>> ibs = None
>>> result = testres.print_unique_annot_config_stats(ibs)
>>> print(result)
"""
if ibs is None:
ibs = testres.ibs
cfx2_dannot_hashid = [
ibs.get_annot_hashid_visual_uuid(daids) for daids in testres.cfgx2_daids
]
unique_daids = ut.compress(
testres.cfgx2_daids, ut.flag_unique_items(cfx2_dannot_hashid)
)
with ut.Indenter('[acfgstats]'):
logger.info('+====')
logger.info('Printing %d unique annotconfig stats' % (len(unique_daids)))
common_acfg = testres.common_acfg
common_acfg['common'] = ut.dict_filter_nones(common_acfg['common'])
logger.info('testres.common_acfg = ' + ut.repr2(common_acfg))
logger.info(
'param_basis(len(daids)) = %r' % (testres.get_param_basis('len(daids)'),)
)
for count, daids in enumerate(unique_daids):
logger.info('+---')
logger.info('acfgx = %r/%r' % (count, len(unique_daids)))
if testres.has_constant_qaids():
ibs.print_annotconfig_stats(testres.qaids, daids)
else:
ibs.print_annot_stats(daids, prefix='d')
logger.info('L___')
[docs] def report(testres):
testres.print_results()
[docs] def print_results(testres, **kwargs):
r"""
CommandLine:
python -m wbia --tf TestResult.print_results
Example:
>>> # DISABLE_DOCTEST
>>> from wbia.expt.test_result import * # NOQA
>>> from wbia.expt import harness
>>> ibs, testres = harness.testdata_expts('PZ_MTEST')
>>> result = testres.print_results()
>>> print(result)
"""
from wbia.expt import experiment_printres
ibs = testres.ibs
experiment_printres.print_results(ibs, testres, **kwargs)
[docs] def get_common_qaids(testres):
if not testres.has_constant_qaids():
# Get only cases the tests share for now
common_qaids = reduce(np.intersect1d, testres.cfgx2_qaids)
return common_qaids
else:
return testres.qaids
[docs] def get_all_qaids(testres):
all_qaids = np.array(ut.unique(ut.flatten(testres.cfgx2_qaids)))
return all_qaids
[docs] def get_test_qaids(testres):
# Transition fucntion
return testres.get_all_qaids()
# return testres.get_common_qaids()
# all_qaids = ut.unique(ut.flatten(testres.cfgx2_qaids))
# return all_qaids
[docs] def case_sample2(testres, filt_cfg, qaids=None, return_mask=False, verbose=None):
r"""
Filters individual test result cases based on how they performed, what
tags they had, and various other things.
Args:
filt_cfg (dict):
Returns:
list: case_pos_list (list of (qx, cfgx)) or isvalid mask
CommandLine:
python -m wbia TestResult.case_sample2
python -m wbia TestResult.case_sample2:0
python -m wbia TestResult.case_sample2:1 --db GZ_ALL --filt :min_tags=1
python -m wbia TestResult.case_sample2:1 --db PZ_Master1 --filt :min_gf_tags=1
python -m wbia TestResult.case_sample2:2 --db PZ_Master1
Example:
>>> # DISABLE_DOCTEST
>>> # The same results is achievable with different filter config settings
>>> from wbia.expt.test_result import * # NOQA
>>> from wbia.init import main_helpers
>>> verbose = True
>>> ibs, testres = main_helpers.testdata_expts('PZ_MTEST', a=['ctrl'])
>>> filt_cfg1 = {'fail': True}
>>> case_pos_list1 = testres.case_sample2(filt_cfg1)
>>> filt_cfg2 = {'min_gtrank': 1}
>>> case_pos_list2 = testres.case_sample2(filt_cfg2)
>>> filt_cfg3 = {'min_gtrank': 0}
>>> case_pos_list3 = testres.case_sample2(filt_cfg3)
>>> filt_cfg4 = {}
>>> case_pos_list4 = testres.case_sample2(filt_cfg4)
>>> assert np.all(case_pos_list1 == case_pos_list2), 'should be equiv configs'
>>> assert np.any(case_pos_list2 != case_pos_list3), 'should be diff configs'
>>> assert np.all(case_pos_list3 == case_pos_list4), 'should be equiv configs'
>>> ibs, testres = main_helpers.testdata_expts('PZ_MTEST', a=['ctrl'], t=['default:sv_on=[True,False]'])
>>> filt_cfg5 = filt_cfg1.copy()
>>> mask5 = testres.case_sample2(filt_cfg5, return_mask=True)
>>> case_pos_list5 = testres.case_sample2(filt_cfg5, return_mask=False)
>>> assert len(mask5.shape) == 2
>>> assert np.all(mask5.T[0] == mask5.T[1])
>>> filt_cfg6 = {'fail': True, 'allcfg': True}
>>> mask6 = testres.case_sample2(filt_cfg6, return_mask=True)
>>> assert np.all(mask6.T[0] == mask6.T[1])
>>> print(mask5)
>>> print(case_pos_list5)
>>> filt_cfg = filt_cfg7 = {'disagree': True}
>>> case_pos_list7 = testres.case_sample2(filt_cfg7, verbose=verbose)
>>> print(case_pos_list7)
Example:
>>> # SCRIPT
>>> from wbia.expt.test_result import * # NOQA
>>> from wbia.init import main_helpers
>>> ibs, testres = main_helpers.testdata_expts('PZ_MTEST', a=['ctrl'])
>>> filt_cfg = main_helpers.testdata_filtcfg()
>>> case_pos_list = testres.case_sample2(filt_cfg)
>>> result = ('case_pos_list = %s' % (str(case_pos_list),))
>>> print(result)
>>> # Extra stuff
>>> all_tags = testres.get_all_tags()
>>> selcted_tags = ut.take(all_tags, case_pos_list.T[0])
>>> print('selcted_tags = %r' % (selcted_tags,))
Example:
>>> # DISABLE_DOCTEST
>>> from wbia.expt.test_result import * # NOQA
>>> from wbia.init import main_helpers
>>> ibs, testres = main_helpers.testdata_expts('PZ_MTEST', a=['ctrl'], t=['default:K=[1,2,3]'])
>>> ut.exec_funckw(testres.case_sample2, globals())
>>> filt_cfg = {'fail': True, 'min_gtrank': 1, 'max_gtrank': None, 'min_gf_timedelta': '24h'}
>>> ibs, testres = main_helpers.testdata_expts('humpbacks_fb', a=['default:has_any=hasnotch,mingt=2,qindex=0:300,dindex=0:300'], t=['default:proot=BC_DTW,decision=max,crop_dim_size=500,crop_enabled=True,manual_extract=False,use_te_scorer=True,ignore_notch=True,te_net=annot_simple', 'default:proot=vsmany'], qaid_override=[12])
>>> filt_cfg = ':disagree=True,index=0:8,min_gtscore=.00001,require_all_cfg=True'
>>> #filt_cfg = cfghelpers.parse_argv_cfg('--filt')[0]
>>> case_pos_list = testres.case_sample2(filt_cfg, verbose=True)
>>> result = ('case_pos_list = %s' % (str(case_pos_list),))
>>> print(result)
>>> # Extra stuff
>>> all_tags = testres.get_all_tags()
>>> selcted_tags = ut.take(all_tags, case_pos_list.T[0])
>>> print('selcted_tags = %r' % (selcted_tags,))
logger.info('qaid = %r' % (qaid,))
logger.info('qx = %r' % (qx,))
logger.info('cfgxs = %r' % (cfgxs,))
# print testres info about this item
take_cfgs = ut.partial(ut.take, index_list=cfgxs)
take_qx = ut.partial(ut.take, index_list=qx)
truth_cfgs = ut.hmap_vals(take_qx, truth2_prop)
truth_item = ut.hmap_vals(take_cfgs, truth_cfgs, max_depth=1)
prop_cfgs = ut.hmap_vals(take_qx, prop2_mat)
prop_item = ut.hmap_vals(take_cfgs, prop_cfgs, max_depth=0)
logger.info('truth2_prop[item] = ' + ut.repr3(truth_item, nl=2))
logger.info('prop2_mat[item] = ' + ut.repr3(prop_item, nl=1))
"""
from wbia.expt import cfghelpers
if verbose is None:
verbose = ut.NOT_QUIET
if verbose:
logger.info('[testres] case_sample2')
if isinstance(filt_cfg, str):
filt_cfg = [filt_cfg]
if isinstance(filt_cfg, list):
_combos = cfghelpers.parse_cfgstr_list2(filt_cfg, strict=False)
filt_cfg = ut.flatten(_combos)[0]
if isinstance(filt_cfg, str):
_combos = cfghelpers.parse_cfgstr_list2([filt_cfg], strict=False)
filt_cfg = ut.flatten(_combos)[0]
if filt_cfg is None:
filt_cfg = {}
qaids = testres.get_test_qaids() if qaids is None else qaids
truth2_prop, prop2_mat = testres.get_truth2_prop(qaids)
ibs = testres.ibs
# Initialize isvalid flags to all true
# np.ones(prop2_mat['is_success'].shape, dtype=np.bool)
participates = prop2_mat['participates']
is_valid = participates.copy()
def unflat_tag_filterflags(tags_list, **kwargs):
from wbia import tag_funcs
flat_tags, cumsum = ut.invertible_flatten2(tags_list)
flat_flags = tag_funcs.filterflags_general_tags(flat_tags, **kwargs)
flags = np.array(ut.unflatten2(flat_flags, cumsum))
return flags
UTFF = unflat_tag_filterflags
def cols_disagree(mat, val):
"""
is_success = prop2_mat['is_success']
"""
nCols = mat.shape[1]
sums = mat.sum(axis=1)
# Find out which rows have different values
disagree_flags1d = np.logical_and(sums > 0, sums < nCols)
disagree_flags2d = np.tile(disagree_flags1d[:, None], (1, nCols))
if not val:
# User asked for rows that agree
flags = np.logical_not(disagree_flags2d)
else:
flags = disagree_flags2d
return flags
def cfg_scoresep(mat, val, op):
"""
Compares scores between different configs
op = operator.ge
is_success = prop2_mat['is_success']
"""
# import scipy.spatial.distance as spdist
nCols = mat.shape[1]
pdistx = vt.pdist_indicies(nCols)
pdist_list = np.array([vt.safe_pdist(row) for row in mat])
flags_list = op(pdist_list, val)
colx_list = [
np.unique(ut.flatten(ut.compress(pdistx, flags))) for flags in flags_list
]
offsets = np.arange(0, nCols * len(mat), step=nCols)
idx_list = ut.flatten(
[colx + offset for colx, offset in zip(colx_list, offsets)]
)
mask = vt.index_to_boolmask(idx_list, maxval=offsets[-1] + nCols)
flags = mask.reshape(mat.shape)
return flags
# List of rules that can filter results
rule_list = [
('disagree', lambda val: cols_disagree(prop2_mat['is_failure'], val)),
(
'min_gt_cfg_scoresep',
lambda val: cfg_scoresep(truth2_prop['gt']['score'], val, operator.ge),
),
('fail', prop2_mat['is_failure']),
('success', prop2_mat['is_success']),
('min_gtrank', partial(operator.ge, truth2_prop['gt']['rank'])),
('max_gtrank', partial(operator.le, truth2_prop['gt']['rank'])),
('max_gtscore', partial(operator.le, truth2_prop['gt']['score'])),
('min_gtscore', partial(operator.ge, truth2_prop['gt']['score'])),
('min_gf_timedelta', partial(operator.ge, truth2_prop['gf']['timedelta'])),
('max_gf_timedelta', partial(operator.le, truth2_prop['gf']['timedelta'])),
# Tag filtering
# FIXME: will break with new config structure
('min_tags', lambda val: UTFF(testres.get_all_tags(), min_num=val)),
('max_tags', lambda val: UTFF(testres.get_all_tags(), max_num=val)),
('min_gf_tags', lambda val: UTFF(testres.get_gf_tags(), min_num=val)),
('max_gf_tags', lambda val: UTFF(testres.get_gf_tags(), max_num=val)),
('min_gt_tags', lambda val: UTFF(testres.get_gt_tags(), min_num=val)),
('max_gt_tags', lambda val: UTFF(testres.get_gt_tags(), max_num=val)),
(
'min_query_annot_tags',
lambda val: UTFF(testres.get_query_annot_tags(), min_num=val),
),
(
'min_gt_annot_tags',
lambda val: UTFF(testres.get_gt_annot_tags(), min_num=val),
),
(
'min_gtq_tags',
lambda val: UTFF(testres.get_gtquery_annot_tags(), min_num=val),
),
(
'max_gtq_tags',
lambda val: UTFF(testres.get_gtquery_annot_tags(), max_num=val),
),
('without_gf_tag', lambda val: UTFF(testres.get_gf_tags(), has_none=val)),
('without_gt_tag', lambda val: UTFF(testres.get_gt_tags(), has_none=val)),
('with_gf_tag', lambda val: UTFF(testres.get_gf_tags(), has_any=val)),
('with_gt_tag', lambda val: UTFF(testres.get_gt_tags(), has_any=val)),
('with_tag', lambda val: UTFF(testres.get_all_tags(), has_any=val)),
('without_tag', lambda val: UTFF(testres.get_all_tags(), has_none=val)),
]
rule_dict = ut.odict(rule_list)
rule_list.append(('max_gf_td', rule_dict['max_gf_timedelta']))
rule_list.append(('min_gf_td', rule_dict['min_gf_timedelta']))
filt_cfg_ = copy.deepcopy(filt_cfg)
# hack to convert to seconds
for tdkey in filt_cfg_.keys():
# timedelta_keys = ['min_gf_timedelta', 'max_gf_timedelta']
# for tdkey in timedelta_keys:
if tdkey.endswith('_timedelta'):
filt_cfg_[tdkey] = ut.ensure_timedelta(filt_cfg_[tdkey])
class VerbFilterInfo(object):
def __init__(self):
self.prev_num_valid = None
def print_pre(self, is_valid, filt_cfg_):
num_valid = is_valid.sum()
logger.info(
'[testres] Sampling from is_valid.size=%r with filt=%r'
% (is_valid.size, ut.get_cfg_lbl(filt_cfg_))
)
logger.info(' * is_valid.shape = %r' % (is_valid.shape,))
logger.info(' * num_valid = %r' % (num_valid,))
self.prev_num_valid = num_valid
def print_post(self, is_valid, flags, msg):
if flags is not None:
num_passed = flags.sum()
num_valid = is_valid.sum()
num_invalidated = self.prev_num_valid - num_valid
logger.info(msg)
if num_invalidated == 0:
if flags is not None:
logger.info(' * num_passed = %r' % (num_passed,))
logger.info(' * num_invalided = %r' % (num_invalidated,))
else:
logger.info(' * prev_num_valid = %r' % (self.prev_num_valid,))
logger.info(' * num_valid = %r' % (num_valid,))
# logger.info(' * is_valid.shape = %r' % (is_valid.shape,))
self.prev_num_valid = num_valid
verbinfo = VerbFilterInfo()
if verbose:
verbinfo.print_pre(is_valid, filt_cfg_)
# Pop irrelevant info
ut.delete_keys(filt_cfg_, ['_cfgstr', '_cfgindex', '_cfgname', '_cfgtype'])
# Pop other non-rule config options
valid_rules = []
def poprule(rulename, default):
# register other rule names for debuging
valid_rules.append(rulename)
return filt_cfg_.pop(rulename, default)
allcfg = poprule('allcfg', None)
orderby = poprule('orderby', None)
reverse = poprule('reverse', None)
sortasc = poprule('sortasc', None)
sortdsc = poprule('sortdsc', poprule('sortdesc', None))
max_pername = poprule('max_pername', None)
require_all_cfg = poprule('require_all_cfg', None)
index = poprule('index', None)
# Pop all chosen rules
rule_value_list = [poprule(key, None) for key, rule in rule_list]
# Assert that only valid configurations were given
if len(filt_cfg_) > 0:
logger.info('ERROR')
logger.info('filtcfg valid rules are = %s' % (ut.repr2(valid_rules, nl=1),))
for key in filt_cfg_.keys():
logger.info(
'did you mean %r instead of %r?'
% (ut.closet_words(key, valid_rules)[0], key)
)
raise NotImplementedError(
'Unhandled filt_cfg.keys() = %r' % (filt_cfg_.keys())
)
# Remove test cases that do not satisfy chosen rules
chosen_rule_idxs = ut.where([val is not None for val in rule_value_list])
chosen_rules = ut.take(rule_list, chosen_rule_idxs)
chosen_vals = ut.take(rule_value_list, chosen_rule_idxs)
for (key, rule), val in zip(chosen_rules, chosen_vals):
if isinstance(rule, np.ndarray):
# When a rule is an ndarray it must have boolean values
flags = rule == val
else:
flags = rule(val)
# HACK: flags are forced to be false for non-participating cases
flags = np.logical_and(flags, participates)
# conjunctive normal form of satisfiability
is_valid = np.logical_and(is_valid, flags)
if verbose:
verbinfo.print_post(is_valid, flags, 'SampleRule: %s = %r' % (key, val))
# HACK:
# If one config for a row passes the filter then all configs should pass
if allcfg:
is_valid = np.logical_or(np.logical_or.reduce(is_valid.T)[:, None], is_valid)
is_valid = np.logical_and(is_valid, participates)
qx_list, cfgx_list = np.nonzero(is_valid)
# Determine a good ordering of the test cases
if sortdsc is not None:
assert orderby is None, 'use orderby or sortasc'
assert reverse is None, 'reverse does not work with sortdsc'
orderby = sortdsc
reverse = True
elif sortasc is not None:
assert reverse is None, 'reverse does not work with sortasc'
assert orderby is None, 'use orderby or sortasc'
orderby = sortasc
reverse = False
else:
reverse = False
if orderby is not None:
# if orderby == 'gtscore':
# order_values = truth2_prop['gt']['score']
# elif orderby == 'gfscore':
# order_values = truth2_prop['gf']['score']
# else:
import re
order_values = None
for prefix_pattern in ['^gt_?', '^gf_?']:
prefix_match = re.match(prefix_pattern, orderby)
if prefix_match is not None:
truth = prefix_pattern[1:3]
propname = orderby[prefix_match.end() :]
if verbose:
logger.info(
'Ordering by truth=%s propname=%s' % (truth, propname)
)
order_values = truth2_prop[truth][propname]
break
if order_values is None:
raise NotImplementedError('Unknown orerby=%r' % (orderby,))
else:
order_values = np.arange(is_valid.size).reshape(is_valid.shape)
# Convert mask into indicies
flat_order = order_values[is_valid]
# Flat sorting indeices in a matrix
if verbose:
if verbose:
logger.info('Reversing ordering (descending)')
else:
logger.info('Normal ordering (ascending)')
if reverse:
sortx = flat_order.argsort()[::-1]
else:
sortx = flat_order.argsort()
qx_list = qx_list.take(sortx, axis=0)
cfgx_list = cfgx_list.take(sortx, axis=0)
# Return at most ``max_pername`` annotation examples per name
if max_pername is not None:
if verbose:
logger.info('Returning at most %d cases per name ' % (max_pername,))
# FIXME: multiple configs
_qaid_list = np.take(qaids, qx_list)
_qnid_list = ibs.get_annot_nids(_qaid_list)
_valid_idxs = []
seen_ = ut.ddict(lambda: 0)
for idx, _qnid in enumerate(_qnid_list):
if seen_[_qnid] < max_pername:
seen_[_qnid] += 1
_valid_idxs.append(idx)
_qx_list = qx_list[_valid_idxs]
_cfgx_list = cfgx_list[_valid_idxs]
_valid_index = np.vstack((_qx_list, _cfgx_list)).T
is_valid = vt.index_to_boolmask(_valid_index, is_valid.shape, isflat=False)
qx_list = _qx_list
cfgx_list = _cfgx_list
if require_all_cfg:
if verbose:
prev_num_valid = is_valid.sum()
logger.info('Enforcing that all configs must pass filters')
logger.info(' * prev_num_valid = %r' % (prev_num_valid,))
qx2_valid_cfgs = ut.group_items(cfgx_list, qx_list)
hasall_cfg = [len(qx2_valid_cfgs[qx]) == testres.nConfig for qx in qx_list]
_qx_list = qx_list.compress(hasall_cfg)
_cfgx_list = cfgx_list.compress(hasall_cfg)
_valid_index = np.vstack((_qx_list, _cfgx_list)).T
is_valid = vt.index_to_boolmask(_valid_index, is_valid.shape, isflat=False)
qx_list = _qx_list
cfgx_list = _cfgx_list
if verbose:
verbinfo.print_post(
is_valid, None, 'Enforcing that all configs must pass filters'
)
if index is not None:
if isinstance(index, str):
index = ut.smart_cast(index, slice)
_qx_list = ut.take(qx_list, index)
_cfgx_list = ut.take(cfgx_list, index)
_valid_index = np.vstack((_qx_list, _cfgx_list)).T
is_valid = vt.index_to_boolmask(_valid_index, is_valid.shape, isflat=False)
qx_list = _qx_list
cfgx_list = _cfgx_list
if verbose:
verbinfo.print_post(
is_valid,
None,
'Taking index=%r sample from len(qx_list) = %r'
% (index, len(qx_list)),
)
if not return_mask:
case_pos_list = np.vstack((qx_list, cfgx_list)).T
case_identifier = case_pos_list
else:
if verbose:
logger.info('Converting cases indicies to a 2d-mask')
case_identifier = is_valid
if verbose:
logger.info('Finished case filtering')
logger.info('Final case stats:')
qx_hist = ut.dict_hist(qx_list)
logger.info(
'config per query stats: %r' % (ut.get_stats_str(qx_hist.values()),)
)
logger.info(
'query per config stats: %r'
% (ut.get_stats_str(ut.dict_hist(cfgx_list).values()),)
)
return case_identifier
[docs] def get_truth2_prop(testres, qaids=None, join_acfg=False):
r"""
Returns:
tuple: (truth2_prop, prop2_mat)
CommandLine:
python -m wbia.expt.test_result --exec-get_truth2_prop --show
Example:
>>> # xdoctest: +REQUIRES(--slow)
>>> # ENABLE_DOCTEST
>>> from wbia.expt.test_result import * # NOQA
>>> import wbia
>>> ibs, testres = wbia.testdata_expts('PZ_MTEST', a=['ctrl'])
>>> (truth2_prop, prop2_mat) = testres.get_truth2_prop()
>>> result = '(truth2_prop, prop2_mat) = %s' % str((truth2_prop, prop2_mat))
>>> print(result)
>>> ut.quit_if_noshow()
>>> import wbia.plottool as pt
>>> ut.show_if_requested()
"""
ibs = testres.ibs
test_qaids = testres.get_test_qaids() if qaids is None else qaids
# test_qaids = ut.random_sample(test_qaids, 20)
truth2_prop = ut.ddict(ut.odict)
# TODO: have this function take in a case_pos_list as input instead
participates = testres.get_infoprop_mat('participant', test_qaids)
truth2_prop['gt']['aid'] = testres.get_infoprop_mat('qx2_gt_aid', test_qaids)
truth2_prop['gf']['aid'] = testres.get_infoprop_mat('qx2_gf_aid', test_qaids)
truth2_prop['gt']['rank'] = testres.get_infoprop_mat('qx2_gt_rank', test_qaids)
truth2_prop['gf']['rank'] = testres.get_infoprop_mat('qx2_gf_rank', test_qaids)
truth2_prop['gt']['score'] = testres.get_infoprop_mat(
'qx2_gt_raw_score', test_qaids
)
truth2_prop['gf']['score'] = testres.get_infoprop_mat(
'qx2_gf_raw_score', test_qaids
)
truth2_prop['gt']['score'] = np.nan_to_num(truth2_prop['gt']['score'])
truth2_prop['gf']['score'] = np.nan_to_num(truth2_prop['gf']['score'])
# Cast nans to ints (that are participants)
# if False:
for truth in ['gt', 'gf']:
rank_mat = truth2_prop[truth]['rank']
flags = np.logical_and(np.isnan(rank_mat), participates)
rank_mat[flags] = testres.get_worst_possible_rank()
# truth2_prop[truth]['rank'] = rank_mat.astype(np.int)
is_success = truth2_prop['gt']['rank'] == 0
is_failure = np.logical_not(is_success)
# THIS IS NOT THE CASE IF THERE ARE UNKNOWN INDIVIDUALS IN THE DATABASE
assert np.all(is_success == (truth2_prop['gt']['rank'] == 0))
# WEIRD THINGS HAPPEN WHEN UNKNOWNS ARE HERE
# hardness_degree_rank[is_success]
# These probably just completely failure spatial verification
# is_weird = hardness_degree_rank == 0
# Get timedelta and annotmatch rowid
for truth in ['gt', 'gf']:
aid_mat = truth2_prop[truth]['aid']
timedelta_mat = np.vstack(
[ibs.get_annot_pair_timedelta(test_qaids, aids) for aids in aid_mat.T]
).T
annotmatch_rowid_mat = np.vstack(
[
ibs.get_annotmatch_rowid_from_undirected_superkey(test_qaids, aids)
for aids in aid_mat.T
]
).T
truth2_prop[truth]['annotmatch_rowid'] = annotmatch_rowid_mat
truth2_prop[truth]['timedelta'] = timedelta_mat
prop2_mat = {}
prop2_mat['is_success'] = is_success
prop2_mat['is_failure'] = is_failure
prop2_mat['participates'] = participates
groupxs = testres.get_cfgx_groupxs()
def group_prop(val, grouped_flags, groupxs):
nRows = len(val)
# Allocate space for new val
new_shape = (nRows, len(groupxs))
if val.dtype == object or val.dtype.type == object:
new_val = np.full(new_shape, None, dtype=val.dtype)
elif ut.is_float(val):
new_val = np.full(new_shape, np.nan, dtype=val.dtype)
else:
new_val = np.zeros(new_shape, dtype=val.dtype)
# Populate new val
grouped_vals = vt.apply_grouping(val.T, groupxs)
_iter = enumerate(zip(grouped_flags, grouped_vals))
for new_col, (flags, group) in _iter:
rows, cols = np.where(flags.T)
new_val[rows, new_col] = group.T[(rows, cols)]
return new_val
if join_acfg:
assert ut.allsame(participates.sum(axis=1))
grouped_flags = vt.apply_grouping(participates.T, groupxs)
# new_prop2_mat = {key: group_prop(val)
# for key, val in prop2_mat.items()}
# new_truth2_prop = {
# truth: {key: group_prop(val)
# for key, val in props.items()}
# for truth, props in truth2_prop.items()}
new_prop2_mat = {}
for key, val in prop2_mat.items():
new_prop2_mat[key] = group_prop(val, grouped_flags, groupxs)
new_truth2_prop = {}
for truth, props in truth2_prop.items():
new_props = {}
for key, val in props.items():
new_props[key] = group_prop(val, grouped_flags, groupxs)
new_truth2_prop[truth] = new_props
prop2_mat_ = new_prop2_mat
truth2_prop_ = new_truth2_prop
else:
prop2_mat_ = prop2_mat
truth2_prop_ = truth2_prop
return truth2_prop_, prop2_mat_
[docs] def interact_individual_result(testres, qaid, cfgx=0):
ibs = testres.ibs
cfgx_list = ut.ensure_iterable(cfgx)
qreq_list = ut.take(testres.cfgx2_qreq_, cfgx_list)
# Preload any requested configs
cm_list = [qreq_.execute(qaids=[qaid]) for qreq_ in qreq_list]
cfgx2_shortlbl = testres.get_short_cfglbls()
show_kwargs = {
'N': 3,
'ori': True,
'ell_alpha': 0.9,
}
# SHOW ANALYSIS
show_kwargs['show_query'] = False
show_kwargs['viz_name_score'] = True
show_kwargs['show_timedelta'] = True
show_kwargs['show_gf'] = True
show_kwargs['with_figtitle'] = False
for cfgx, cm, qreq_ in zip(cfgx_list, cm_list, qreq_list):
query_lbl = cfgx2_shortlbl[cfgx]
fnum = cfgx
cm.ishow_analysis(
ibs,
figtitle=query_lbl,
fnum=fnum,
annot_mode=1,
qreq_=qreq_,
**show_kwargs
)
[docs] def draw_score_diff_disti(testres):
r"""
CommandLine:
python -m wbia --tf TestResult.draw_score_diff_disti --show -a varynannots_td -t best --db PZ_Master1
python -m wbia --tf TestResult.draw_score_diff_disti --show -a varynannots_td -t best --db GZ_Master1
python -m wbia --tf TestResult.draw_score_diff_disti --show -a varynannots_td1h -t best --db GIRM_Master1
python -m wbia --tf TestResult.draw_score_diff_disti --show -a varynannots_td:qmin_pername=3,dpername=2 -t best --db PZ_Master1
python -m wbia --tf get_annotcfg_list -a varynannots_td -t best --db PZ_Master1
13502
python -m wbia --tf draw_match_cases --db PZ_Master1 -a varynannots_td:dsample_size=.01 -t best --show --qaid 13502
python -m wbia --tf draw_match_cases --db PZ_Master1 -a varynannots_td -t best --show
Example:
>>> # DISABLE_DOCTEST
>>> from wbia.expt.test_result import * # NOQA
>>> import wbia
>>> ibs, testres = wbia.testdata_expts('PZ_Master1', a=['varynannots_td'], t=['best'])
>>> result = testres.draw_score_diff_disti()
>>> print(result)
>>> ut.show_if_requested()
"""
import wbia.plottool as pt
import vtool as vt
# dont look at filtered cases
ibs = testres.ibs
qaids = testres.get_test_qaids()
qaids = ibs.get_annot_tag_filterflags(qaids, {'has_none': 'timedeltaerror'})
gt_rawscore = testres.get_infoprop_mat('qx2_gt_raw_score', qaids=qaids)
gf_rawscore = testres.get_infoprop_mat('qx2_gf_raw_score', qaids=qaids)
gt_valid_flags_list = np.isfinite(gt_rawscore).T
gf_valid_flags_list = np.isfinite(gf_rawscore).T
cfgx2_gt_scores = vt.zipcompress(gt_rawscore.T, gt_valid_flags_list)
cfgx2_gf_scores = vt.zipcompress(gf_rawscore.T, gf_valid_flags_list)
# partition by rank
gt_rank = testres.get_infoprop_mat('qx2_gt_rank', qaids=qaids)
gf_ranks = testres.get_infoprop_mat('qx2_gf_rank', qaids=qaids)
cfgx2_gt_ranks = vt.zipcompress(gt_rank.T, gt_valid_flags_list)
cfgx2_rank0_gt_scores = vt.zipcompress(
cfgx2_gt_scores, [ranks == 0 for ranks in cfgx2_gt_ranks]
)
cfgx2_rankX_gt_scores = vt.zipcompress(
cfgx2_gt_scores, [ranks > 0 for ranks in cfgx2_gt_ranks]
)
cfgx2_gf_ranks = vt.zipcompress(gf_ranks.T, gf_valid_flags_list)
cfgx2_rank0_gf_scores = vt.zipcompress(
cfgx2_gf_scores, [ranks == 0 for ranks in cfgx2_gf_ranks]
)
# valid_gtranks = gt_rank[isvalid]
# valid_qaids = qaids[isvalid]
# Hack remove timedelta error
# valid_qaids = valid_qaids[flags]
# valid_gt_rawscore = valid_gt_rawscore[flags]
# valid_gtranks = valid_gtranks[flags]
xdata = list(map(len, testres.cfgx2_daids))
USE_MEDIAN = True # not ut.get_argflag('--use-mean')
# USE_LOG = True
USE_LOG = False
if USE_MEDIAN:
ave = np.median
dev = vt.median_abs_dev
else:
ave = np.mean
dev = np.std
def make_interval_args(arr_list, ave=ave, dev=dev, **kwargs):
# if not USE_MEDIAN:
# # maybe approximate median by removing the most extreme values
# arr_list = [np.array(sorted(arr))[5:-5] for arr in arr_list]
import utool as ut
if USE_LOG:
arr_list = list(map(lambda x: np.log(x + 1), arr_list))
sizes_ = list(map(len, arr_list))
ydata_ = list(map(ave, arr_list))
spread_ = list(map(dev, arr_list))
# ut.get_stats(arr_list, axis=0)
label = kwargs.get('label', '')
label += ' ' + ut.get_funcname(ave)
kwargs['label'] = label
logger.info(
label
+ 'score stats : '
+ ut.repr2(
ut.get_jagged_stats(arr_list, use_median=True), nl=1, precision=1
)
)
return ydata_, spread_, kwargs, sizes_
args_list1 = [
make_interval_args(cfgx2_gt_scores, label='GT', color=pt.TRUE_BLUE),
make_interval_args(cfgx2_gf_scores, label='GF', color=pt.FALSE_RED),
]
args_list2 = [
make_interval_args(
cfgx2_rank0_gt_scores, label='GT-rank = 0', color=pt.LIGHT_GREEN
),
make_interval_args(
cfgx2_rankX_gt_scores, label='GT-rank > 0', color=pt.YELLOW
),
make_interval_args(cfgx2_rank0_gf_scores, label='GF-rank = 0', color=pt.PINK),
# make_interval_args(cfgx2_rank2_gt_scores, label='gtrank < 2'),
]
plotargs_list = [args_list1, args_list2]
# plotargs_list = [args_list1]
ymax = -np.inf
ymin = np.inf
for args_list in plotargs_list:
ydata_list = np.array(ut.get_list_column(args_list, 0))
spread = np.array(ut.get_list_column(args_list, 1))
ymax = max(ymax, np.array(ydata_list + spread).max())
ymin = min(ymax, np.array(ydata_list - spread).min())
ylabel = 'log name score' if USE_LOG else 'name score'
statickw = dict(
# title='scores vs dbsize',
xlabel='database size (number of annotations)',
ylabel=ylabel,
# xscale='log', ymin=0, ymax=10,
linewidth=2,
spread_alpha=0.5,
lightbg=True,
marker='o',
# xmax='data',
ymax=ymax,
ymin=ymin,
xmax='data',
xmin='data',
)
fnum = pt.ensure_fnum(None)
pnum_ = pt.make_pnum_nextgen(len(plotargs_list), 1)
for args_list in plotargs_list:
ydata_list = ut.get_list_column(args_list, 0)
spread_list = ut.get_list_column(args_list, 1)
kwargs_list = ut.get_list_column(args_list, 2)
sizes_list = ut.get_list_column(args_list, 3)
logger.info('sizes_list = %s' % (ut.repr2(sizes_list, nl=1),))
# Pack kwargs list for multi_plot
plotkw = ut.dict_stack2(kwargs_list, '_list')
plotkw2 = ut.merge_dicts(statickw, plotkw)
pt.multi_plot(
xdata,
ydata_list,
spread_list=spread_list,
fnum=fnum,
pnum=pnum_(),
**plotkw2
)
# pt.adjust_subplots(hspace=.3)
figtitle = 'Score vs DBSize: %s' % (testres.get_title_aug())
pt.set_figtitle(figtitle)
[docs] def draw_rank_cmc(testres):
"""
Wrapper
"""
from wbia.expt import experiment_drawing
experiment_drawing.draw_rank_cmc(testres.ibs, testres)
[docs] def draw_match_cases(testres, **kwargs):
"""
Wrapper
"""
from wbia.expt import experiment_drawing
experiment_drawing.draw_match_cases(testres.ibs, testres, **kwargs)
[docs] def draw_failure_cases(testres, **kwargs):
"""
>>> from wbia.other.dbinfo import * # NOQA
>>> import wbia
>>> ibs, testres = wbia.testdata_expts(defaultdb='PZ_MTEST', a='timectrl:qsize=2', t='invar:AI=[False],RI=False', use_cache=False)
"""
from wbia.expt import experiment_drawing
# kwargs = kwargs.copy()
orig_filter = ':'
kwargs['f'] = orig_filter + 'fail'
case_pos_list = testres.case_sample2(':fail=True,index=0:5')
experiment_drawing.draw_match_cases(
testres.ibs,
testres,
case_pos_list=case_pos_list,
annot_modes=[1],
interact=True,
)
[docs] def find_score_thresh_cutoff(testres):
"""
FIXME
DUPLICATE CODE
rectify with experiment_drawing
"""
# import wbia.plottool as pt
import vtool as vt
if ut.VERBOSE:
logger.info('[dev] FIX DUPLICATE CODE find_thresh_cutoff')
# from wbia.expt import cfghelpers
assert len(testres.cfgx2_qreq_) == 1, 'can only specify one config here'
cfgx = 0
# qreq_ = testres.cfgx2_qreq_[cfgx]
test_qaids = testres.get_test_qaids()
gt_rawscore = testres.get_infoprop_mat('qx2_gt_raw_score', qaids=test_qaids).T[
cfgx
]
gf_rawscore = testres.get_infoprop_mat('qx2_gf_raw_score', qaids=test_qaids).T[
cfgx
]
# FIXME: may need to specify which cfg is used in the future
# isvalid = testres.case_sample2(filt_cfg, return_mask=True).T[cfgx]
tp_nscores = gt_rawscore
tn_nscores = gf_rawscore
tn_qaids = tp_qaids = test_qaids
# encoder = vt.ScoreNormalizer(target_tpr=.7)
# logger.info(qreq_.get_cfgstr())
part_attrs = {1: {'qaid': tp_qaids}, 0: {'qaid': tn_qaids}}
fpr = None
tpr = 0.85
encoder = vt.ScoreNormalizer(adjust=8, fpr=fpr, tpr=tpr, monotonize=True)
# tp_scores = tp_nscores
# tn_scores = tn_nscores
name_scores, labels, attrs = encoder._to_xy(tp_nscores, tn_nscores, part_attrs)
encoder.fit(name_scores, labels, attrs)
score_thresh = encoder.learn_threshold2()
# Find intersection point
# TODO: add to score normalizer.
# Improve robustness
# pt.figure()
# pt.plot(xdata, curve)
# pt.plot(x_submax, y_submax, 'o')
return score_thresh
[docs] def print_percent_identification_success(testres):
"""
Prints names identified (at rank 1) / names queried.
This combines results over multiple queries of a particular name using
max
OLD, MAYBE DEPRIATE
Example:
>>> # DISABLE_DOCTEST
>>> from wbia.expt.test_result import * # NOQA
"""
ibs = testres.ibs
qaids = testres.get_test_qaids()
unique_nids, groupxs = ut.group_indices(ibs.get_annot_nids(qaids))
qx2_gt_raw_score = testres.get_infoprop_mat('qx2_gt_raw_score', qaids=qaids)
qx2_gf_raw_score = testres.get_infoprop_mat('qx2_gf_raw_score', qaids=qaids)
nx2_gt_raw_score = np.array(
[
np.nanmax(scores, axis=0)
for scores in vt.apply_grouping(qx2_gt_raw_score, groupxs)
]
)
nx2_gf_raw_score = np.array(
[
np.nanmax(scores, axis=0)
for scores in vt.apply_grouping(qx2_gf_raw_score, groupxs)
]
)
cfgx2_success = (nx2_gt_raw_score > nx2_gf_raw_score).T
logger.info('Identification success (names identified / names queried)')
for cfgx, success in enumerate(cfgx2_success):
pipelbl = testres.cfgx2_lbl[cfgx]
percent = 100 * success.sum() / len(success)
logger.info(
'%2d) success = %r/%r = %.2f%% -- %s'
% (cfgx, success.sum(), len(success), percent, pipelbl)
)
[docs] def print_config_overlap(testres, with_plot=True):
truth2_prop, prop2_mat = testres.get_truth2_prop()
qx2_gt_ranks = truth2_prop['gt']['rank']
qx2_success = qx2_gt_ranks == 0
cfgx2_num_correct = np.nansum(qx2_success, axis=0)
best_cfgx = cfgx2_num_correct.argmax()
logger.info('Config Overlap')
# Matrix version
# disjoint_mat = np.zeros((testres.nConfig, testres.nConfig), dtype=np.int32)
# improves_mat = np.zeros((testres.nConfig, testres.nConfig), dtype=np.int32)
isect_mat = np.zeros((testres.nConfig, testres.nConfig), dtype=np.int32)
union_mat = np.zeros((testres.nConfig, testres.nConfig), dtype=np.int32)
for cfgx1 in range(testres.nConfig):
for cfgx2 in range(testres.nConfig):
if cfgx1 == cfgx2:
success_qx1 = np.where(qx2_success.T[cfgx1])[0]
isect_mat[cfgx1][cfgx2] = len(success_qx1)
union_mat[cfgx1][cfgx2] = len(success_qx1)
continue
success_qx1 = np.where(qx2_success.T[cfgx1])[0]
success_qx2 = np.where(qx2_success.T[cfgx2])[0]
union_ = np.union1d(success_qx1, success_qx2)
isect_ = np.intersect1d(success_qx1, success_qx2)
# disjoints = np.setdiff1d(union_, isect_)
# disjoint_mat[cfgx1][cfgx2] = len(disjoints)
isect_mat[cfgx1][cfgx2] = len(isect_)
union_mat[cfgx1][cfgx2] = len(union_)
# improves = np.setdiff1d(success_qx2, isect_)
# improves_mat[cfgx2][cfgx1] = len(improves)
n_success_list = np.array(
[qx2_success.T[cfgx1].sum() for cfgx1 in range(testres.nConfig)]
)
improves_mat = n_success_list[:, None] - isect_mat
disjoint_mat = union_mat - isect_mat
logger.info('n_success_list = %r' % (n_success_list,))
logger.info('union_mat =\n%s' % (union_mat,))
logger.info('isect_mat =\n%s' % (isect_mat,))
logger.info('cfgx1 and cfgx2 have <x> not in common')
logger.info('disjoint_mat =\n%s' % (disjoint_mat,))
logger.info('cfgx1 helps cfgx2 by <x>')
logger.info('improves_mat =\n%s' % (improves_mat,))
logger.info('improves_mat.sum(axis=1) = \n%s' % (improves_mat.sum(axis=1),))
bestx_by_improves = improves_mat.sum(axis=1).argmax()
logger.info('bestx_by_improves = %r' % (bestx_by_improves,))
# Numbered version
logger.info('best_cfgx = %r' % (best_cfgx,))
for cfgx in range(testres.nConfig):
if cfgx == best_cfgx:
continue
pipelbl = testres.cfgx2_lbl[cfgx]
qx2_anysuccess = np.logical_or(qx2_success.T[cfgx], qx2_success.T[best_cfgx])
# Queries that other got right that best did not get right
qx2_othersuccess = np.logical_and(
qx2_anysuccess, np.logical_not(qx2_success.T[best_cfgx])
)
logger.info(
'cfgx %d) has %d success cases that that the best config does not have -- %s'
% (cfgx, qx2_othersuccess.sum(), pipelbl)
)
qx2_success.T[cfgx]
if with_plot:
# y = None
# for x in qx2_gt_ranks:
# x = np.minimum(x, 3)
# z = (x.T - x[:, None])
# if np.any(z):
# logger.info(z)
# if y is None:
# y = z
# else:
# y += z
if False:
# Chip size stats
ave_dlen = [ # NOQA
np.sqrt(
np.array(
testres.ibs.get_annot_chip_dlensqrd(
testres.qaids, config2_=qreq_.query_config2_
)
)
).mean()
for qreq_ in testres.cfgx2_qreq_
]
ave_width_inimg = [ # NOQA
np.array(
testres.ibs.get_annot_bboxes(
testres.qaids, config2_=qreq_.query_config2_
)
)[:, 2 + 0].mean()
for qreq_ in testres.cfgx2_qreq_
]
ave_width = [ # NOQA
np.array(
testres.ibs.get_annot_chip_sizes(
testres.qaids, config2_=qreq_.query_config2_
)
)[:, 0].mean()
for qreq_ in testres.cfgx2_qreq_
]
import wbia.plottool as pt
# pt.plt.imshow(-y, interpolation='none', cmap='hot')
# pt.plt.colorbar()
def label_ticks():
import wbia.plottool as pt
ax = pt.gca()
labels = testres.get_varied_labels()
ax.set_xticks(list(range(len(labels))))
ax.set_xticklabels([lbl[0:100] for lbl in labels])
[lbl.set_rotation(-25) for lbl in ax.get_xticklabels()]
[lbl.set_horizontalalignment('left') for lbl in ax.get_xticklabels()]
# xgrid, ygrid = np.meshgrid(range(len(labels)), range(len(labels)))
# pt.plot_surface3d(xgrid, ygrid, disjoint_mat)
ax.set_yticks(list(range(len(labels))))
ax.set_yticklabels([lbl[0:100] for lbl in labels])
[lbl.set_horizontalalignment('right') for lbl in ax.get_yticklabels()]
[lbl.set_verticalalignment('center') for lbl in ax.get_yticklabels()]
# [lbl.set_rotation(20) for lbl in ax.get_yticklabels()]
pt.figure(fnum=pt.next_fnum())
pt.plt.imshow(union_mat, interpolation='none', cmap='hot')
pt.plt.colorbar()
pt.set_title(
'union mat: cfg<x> and cfg<y> have <z> success cases in in total'
)
label_ticks()
label_ticks()
pt.figure(fnum=pt.next_fnum())
pt.plt.imshow(isect_mat, interpolation='none', cmap='hot')
pt.plt.colorbar()
pt.set_title('isect mat: cfg<x> and cfg<y> have <z> success cases in common')
label_ticks()
pt.figure(fnum=pt.next_fnum())
pt.plt.imshow(disjoint_mat, interpolation='none', cmap='hot')
pt.plt.colorbar()
pt.set_title(
'disjoint mat (union - isect): cfg<x> and cfg<y> have <z> success cases not in common'
)
# xgrid, ygrid = np.meshgrid(range(len(labels)), range(len(labels)))
# pt.plot_surface3d(xgrid, ygrid, improves_mat)
pt.figure(fnum=pt.next_fnum())
pt.plt.imshow(improves_mat, interpolation='none', cmap='hot')
pt.plt.colorbar()
pt.set_title(
'improves mat (diag.T - isect): cfg<x> got <z> qaids that cfg <y> missed'
)
label_ticks()
# pt.colorbar(np.unique(y))
[docs] def map_score(testres):
"""
For each query compute a precision recall curve.
Then, for each query compute the average precision.
Then take the mean of all average precisions to obtain the mAP.
Script:
>>> #ibs = wbia.opendb('Oxford')
>>> #ibs, testres = wbia.testdata_expts('Oxford', a='oxford', p='smk:nWords=[64000],nAssign=[1],SV=[False,True]')
>>> import wbia
>>> ibs, testres = wbia.testdata_expts('Oxford', a='oxford', p='smk:nWords=[64000],nAssign=[1],SV=[False,True],can_match_sameimg=True')
>>> import wbia
>>> ibs, testres = wbia.testdata_expts('Oxford', a='oxford', p='smk:nWords=[64000],nAssign=[1],SV=[False],can_match_sameimg=True')
"""
import sklearn.metrics
qaids = testres.get_test_qaids()
ibs = testres.ibs
PLOT = True
PLOT = False
cfgx2_cms = []
for qreq_ in testres.cfgx2_qreq_:
cm_list = qreq_.execute(qaids)
cm_list = [cm.extend_results(qreq_) for cm in cm_list]
for cm in cm_list:
cm.score_annot_csum(qreq_)
# #cm.sortself()
cfgx2_cms.append(cm_list)
map_list = []
unique_names, groupxs = ut.group_indices(ibs.annots(qaids).names)
for cm_list, qreq_ in zip(cfgx2_cms, testres.cfgx2_qreq_):
if PLOT:
import wbia.plottool as pt
pt.qt4ensure()
fnum = pt.ensure_fnum(None)
pt.figure(fnum=fnum)
avep_list = []
# fnum = pt.ensure_fnum(None)
# pt.figure(fnum=fnum)
for cm in cm_list:
# Ignore junk images
flags = np.array(ibs.annots(cm.daid_list).quality_texts) != 'junk'
assert np.all(flags)
daid_list = cm.daid_list
dnid_list = cm.dnid_list
y_true = (cm.qnid == dnid_list).compress(flags).astype(np.int)
y_score = cm.annot_score_list.compress(flags)
y_score[~np.isfinite(y_score)] = 0
y_score = np.nan_to_num(y_score)
sortx = np.argsort(y_score)[::-1]
daid_list = daid_list.take(sortx)
dnid_list = dnid_list.take(sortx)
y_true = y_true.take(sortx)
y_score = y_score.take(sortx)
# logger.info(cm.get_annot_ranks(cm.get_top_gt_aids(ibs)))
precision, recall, thresholds = sklearn.metrics.precision_recall_curve(
y_true, y_score
)
if PLOT:
pt.plot2(
recall,
precision,
marker='',
linestyle='-',
x_label='recall',
y_label='precision',
)
avep = sklearn.metrics.average_precision_score(y_true, y_score)
# avep = [
# sklearn.metrics.average_precision_score(y_true, y_score, average=average)
# for average in ['micro', 'macro', 'samples', 'weighted']
# ]
# if np.any(np.isnan(avep)):
# break
# if np.isnan(avep):
# break
avep_list.append(avep)
# mean_ave_precision = np.mean(avep_list, axis=0)
name_to_ave = [np.mean(a) for a in ut.apply_grouping(avep_list, groupxs)]
name_to_ave_ = dict(zip(unique_names, name_to_ave))
logger.info(
'name_to_ave_ = %s' % (ut.align(ut.repr3(name_to_ave_, precision=3), ':'))
)
mean_ave_precision = np.mean(name_to_ave)
logger.info('mean_ave_precision = %r' % (mean_ave_precision,))
map_list.append(mean_ave_precision)
return map_list
[docs] def embed_testres(testres):
"""
CommandLine:
python -m wbia TestResults.embed_testres
Example:
>>> # SCRIPT
>>> from wbia.expt.test_result import * # NOQA
>>> from wbia.init import main_helpers
>>> ibs, testres = main_helpers.testdata_expts(defaultdb='PZ_MTEST')
>>> embed_testres(testres)
"""
ut.embed()
[docs] def get_options(testres):
func_list = [
testres.print_results,
testres.draw_rank_cmc,
testres.draw_match_cases,
testres.embed_testres,
]
return func_list
[docs] def get_actions(testres):
actions = ut.odict(
[
(testres.print_results, (['print', 'p'], '')),
(testres.draw_rank_cmc, (['cmc'], '')),
(testres.draw_match_cases, (['case'], '')),
(testres.embed_testres, (['embed', 'ipy'], '')),
]
)
return actions
[docs] def help(testres):
# list functions that accept the standard interface
prefix = 'wbia'
suffix = testres.reconstruct_test_flags()
func_list = testres.get_options()
funcname_list = [ut.get_funcname(func) for func in func_list]
cmdstr_list = [' '.join([prefix, funcname, suffix]) for funcname in funcname_list]
ut.cprint('Available Functions:', 'blue')
logger.info(', '.join(funcname_list))
ut.cprint('Available Commandline:', 'blue')
logger.info('\n'.join(cmdstr_list))