Source code for wbia.algo.verif.clf_helpers

# -*- coding: utf-8 -*-
"""
This module is a work in progress, as such concepts are subject to change.

MAIN IDEA:
    `MultiTaskSamples` serves as a structure to contain and manipulate a set of
    samples with potentially many different types of labels and features.
"""
import logging
import utool as ut
import ubelt as ub
import numpy as np
from wbia import dtool as dt
import pandas as pd
import sklearn
import sklearn.metrics
import sklearn.ensemble
import sklearn.impute
import sklearn.pipeline
import sklearn.neural_network
from wbia.algo.verif import sklearn_utils

print, rrr, profile = ut.inject2(__name__)
logger = logging.getLogger('wbia')


[docs]class XValConfig(dt.Config): _param_info_list = [ # ut.ParamInfo('type', 'StratifiedKFold'), ut.ParamInfo('type', 'StratifiedGroupKFold'), ut.ParamInfo('n_splits', 3), ut.ParamInfo( 'shuffle', True, hideif=lambda cfg: cfg['type'] == 'StratifiedGroupKFold' ), ut.ParamInfo( 'random_state', 3953056901, hideif=lambda cfg: cfg['type'] == 'StratifiedGroupKFold', ), ]
[docs]@ut.reloadable_class class ClfProblem(ut.NiceRepr): def __init__(pblm): pblm.deploy_task_clfs = None pblm.eval_task_clfs = None pblm.xval_kw = XValConfig() pblm.eval_task_clfs = None pblm.task_combo_res = None pblm.verbose = True
[docs] def set_pandas_options(pblm): # pd.options.display.max_rows = 10 pd.options.display.max_rows = 20 pd.options.display.max_columns = 40 pd.options.display.width = 160 pd.options.display.float_format = lambda x: '%.4f' % (x,)
[docs] def set_pandas_options_low(pblm): # pd.options.display.max_rows = 10 pd.options.display.max_rows = 5 pd.options.display.max_columns = 40 pd.options.display.width = 160 pd.options.display.float_format = lambda x: '%.4f' % (x,)
[docs] def set_pandas_options_normal(pblm): # pd.options.display.max_rows = 10 pd.options.display.max_rows = 20 pd.options.display.max_columns = 40 pd.options.display.width = 160 pd.options.display.float_format = lambda x: '%.4f' % (x,)
[docs] def learn_evaluation_classifiers(pblm, task_keys=None, clf_keys=None, data_keys=None): """ Evaluates by learning classifiers using cross validation. Do not use this to learn production classifiers. python -m wbia.algo.verif.vsone evaluate_classifiers --db PZ_PB_RF_TRAIN --show Example: CommandLine: python -m clf_helpers learn_evaluation_classifiers Example: >>> # ENABLE_DOCTEST >>> from wbia.algo.verif.clf_helpers import * # NOQA >>> pblm = IrisProblem() >>> pblm.setup() >>> pblm.verbose = True >>> pblm.eval_clf_keys = ['Logit', 'RF'] >>> pblm.eval_task_keys = ['iris'] >>> pblm.eval_data_keys = ['learn(all)'] >>> result = pblm.learn_evaluation_classifiers() >>> res = pblm.task_combo_res['iris']['Logit']['learn(all)'] >>> res.print_report() >>> res = pblm.task_combo_res['iris']['RF']['learn(all)'] >>> res.print_report() >>> print(result) """ pblm.eval_task_clfs = ut.AutoVivification() pblm.task_combo_res = ut.AutoVivification() if task_keys is None: task_keys = pblm.eval_task_keys if data_keys is None: data_keys = pblm.eval_data_keys if clf_keys is None: clf_keys = pblm.eval_clf_keys if task_keys is None: task_keys = [pblm.primary_task_key] if data_keys is None: data_keys = [pblm.default_data_key] if clf_keys is None: clf_keys = [pblm.default_clf_key] if pblm.verbose: ut.cprint('[pblm] learn_evaluation_classifiers', color='blue') ut.cprint('[pblm] task_keys = {}'.format(task_keys)) ut.cprint('[pblm] data_keys = {}'.format(data_keys)) ut.cprint('[pblm] clf_keys = {}'.format(clf_keys)) Prog = ut.ProgPartial(freq=1, adjust=False, prehack='%s') task_prog = Prog(task_keys, label='Task') for task_key in task_prog: dataset_prog = Prog(data_keys, label='Data') for data_key in dataset_prog: clf_prog = Prog(clf_keys, label='CLF') for clf_key in clf_prog: pblm._ensure_evaluation_clf(task_key, data_key, clf_key)
def _ensure_evaluation_clf(pblm, task_key, data_key, clf_key, use_cache=True): """ Learns and caches an evaluation (cross-validated) classifier and tests and caches the results. data_key = 'learn(sum,glob)' clf_key = 'RF' """ # TODO: add in params used to construct features into the cfgstr if hasattr(pblm.samples, 'sample_hashid'): ibs = pblm.infr.ibs sample_hashid = pblm.samples.sample_hashid() feat_dims = pblm.samples.X_dict[data_key].columns.values.tolist() # cfg_prefix = sample_hashid + pblm.qreq_.get_cfgstr() + feat_cfgstr est_kw1, est_kw2 = pblm._estimator_params(clf_key) param_id = ut.get_dict_hashid(est_kw1) xval_id = pblm.xval_kw.get_cfgstr() cfgstr = '_'.join( [ sample_hashid, param_id, xval_id, task_key, data_key, clf_key, ut.hashid_arr(feat_dims, 'feats'), ] ) fname = 'eval_clfres_' + ibs.dbname else: fname = 'foo' feat_dims = None cfgstr = 'bar' use_cache = False # TODO: ABI class should not be caching cacher_kw = dict(appname='vsone_rf_train', enabled=use_cache, verbose=1) cacher_clf = ub.Cacher(fname, cfgstr=cfgstr, meta=[feat_dims], **cacher_kw) data = cacher_clf.tryload() if not data: data = pblm._train_evaluation_clf(task_key, data_key, clf_key) cacher_clf.save(data) clf_list, res_list = data labels = pblm.samples.subtasks[task_key] combo_res = ClfResult.combine_results(res_list, labels) pblm.eval_task_clfs[task_key][clf_key][data_key] = clf_list pblm.task_combo_res[task_key][clf_key][data_key] = combo_res def _train_evaluation_clf(pblm, task_key, data_key, clf_key, feat_dims=None): """ Learns a cross-validated classifier on the dataset Ignore: >>> from wbia.algo.verif.vsone import * # NOQA >>> pblm = OneVsOneProblem() >>> pblm.load_features() >>> pblm.load_samples() >>> data_key = 'learn(all)' >>> task_key = 'photobomb_state' >>> clf_key = 'RF-OVR' >>> task_key = 'match_state' >>> data_key = pblm.default_data_key >>> clf_key = pblm.default_clf_key """ X_df = pblm.samples.X_dict[data_key] labels = pblm.samples.subtasks[task_key] assert np.all(labels.encoded_df.index == X_df.index) clf_partial = pblm._get_estimator(clf_key) xval_kw = pblm.xval_kw.asdict() clf_list = [] res_list = [] skf_list = pblm.samples.stratified_kfold_indices(**xval_kw) skf_prog = ut.ProgIter(skf_list, label='skf-train-eval') for train_idx, test_idx in skf_prog: X_df_train = X_df.iloc[train_idx] assert X_df_train.index.tolist() == ut.take(pblm.samples.index, train_idx) # train_uv = X_df.iloc[train_idx].index # X_train = X_df.loc[train_uv] # y_train = labels.encoded_df.loc[train_uv] if feat_dims is not None: X_df_train = X_df_train[feat_dims] X_train = X_df_train.values y_train = labels.encoded_df.iloc[train_idx].values.ravel() clf = clf_partial() clf.fit(X_train, y_train) # Note: There is a corner case where one fold doesn't get any # labels of a certain class. Because y_train is an encoded integer, # the clf.classes_ attribute will cause predictions to agree with # other classifiers trained on the same labels. # Evaluate results res = ClfResult.make_single( clf, X_df, test_idx, labels, data_key, feat_dims=feat_dims ) clf_list.append(clf) res_list.append(res) return clf_list, res_list def _external_classifier_result( pblm, clf, task_key, data_key, feat_dims=None, test_idx=None ): """ Given an external classifier (ensure its trained on disjoint data) evaluate all data on it. Args: test_idx (list): subset of this classifier to test on (defaults to all if None) """ X_df = pblm.samples.X_dict[data_key] if test_idx is None: test_idx = np.arange(len(X_df)) labels = pblm.samples.subtasks[task_key] res = ClfResult.make_single( clf, X_df, test_idx, labels, data_key, feat_dims=feat_dims ) return res
[docs] def learn_deploy_classifiers(pblm, task_keys=None, clf_key=None, data_key=None): """ Learns on data without any train/validation split """ if pblm.verbose > 0: ut.cprint('[pblm] learn_deploy_classifiers', color='blue') if clf_key is None: clf_key = pblm.default_clf_key if data_key is None: data_key = pblm.default_data_key if task_keys is None: task_keys = list(pblm.samples.supported_tasks()) if pblm.deploy_task_clfs is None: pblm.deploy_task_clfs = ut.AutoVivification() Prog = ut.ProgPartial(freq=1, adjust=False, prehack='%s') task_prog = Prog(task_keys, label='Task') task_clfs = {} for task_key in task_prog: clf = pblm._train_deploy_clf(task_key, data_key, clf_key) task_clfs[task_key] = clf pblm.deploy_task_clfs[task_key][clf_key][data_key] = clf return task_clfs
def _estimator_params(pblm, clf_key): est_type = clf_key.split('-')[0] if est_type in {'RF', 'RandomForest'}: est_kw1 = { # 'max_depth': 4, 'bootstrap': True, 'class_weight': None, 'criterion': 'entropy', 'max_features': 'sqrt', # 'max_features': None, 'min_samples_leaf': 5, 'min_samples_split': 2, # 'n_estimators': 64, 'n_estimators': 256, } # Hack to only use missing values if we have the right sklearn if 'missing_values' in ut.get_func_kwargs( sklearn.ensemble.RandomForestClassifier.__init__ ): est_kw1['missing_values'] = np.nan est_kw2 = { 'random_state': 3915904814, 'verbose': 0, 'n_jobs': -1, } elif est_type in {'SVC', 'SVM'}: est_kw1 = dict(kernel='linear') est_kw2 = {} elif est_type in {'Logit', 'LogisticRegression'}: est_kw1 = {} est_kw2 = {} elif est_type in {'MLP'}: est_kw1 = dict( activation='relu', alpha=1e-05, batch_size='auto', beta_1=0.9, beta_2=0.999, early_stopping=False, epsilon=1e-08, hidden_layer_sizes=(10, 10), learning_rate='constant', learning_rate_init=0.001, max_iter=200, momentum=0.9, nesterovs_momentum=True, power_t=0.5, random_state=3915904814, shuffle=True, solver='lbfgs', tol=0.0001, validation_fraction=0.1, warm_start=False, ) est_kw2 = dict(verbose=False) else: raise KeyError('Unknown Estimator') return est_kw1, est_kw2 def _get_estimator(pblm, clf_key): """ Returns sklearn classifier """ tup = clf_key.split('-') wrap_type = None if len(tup) == 1 else tup[1] est_type = tup[0] multiclass_wrapper = { None: ut.identity, 'OVR': sklearn.multiclass.OneVsRestClassifier, 'OVO': sklearn.multiclass.OneVsOneClassifier, }[wrap_type] est_class = { 'RF': sklearn.ensemble.RandomForestClassifier, 'SVC': sklearn.svm.SVC, 'Logit': sklearn.linear_model.LogisticRegression, 'MLP': sklearn.neural_network.MLPClassifier, }[est_type] est_kw1, est_kw2 = pblm._estimator_params(est_type) est_params = ut.merge_dicts(est_kw1, est_kw2) # steps = [] # steps.append((est_type, est_class(**est_params))) # if wrap_type is not None: # steps.append((wrap_type, multiclass_wrapper)) if est_type == 'MLP': def clf_partial(): pipe = sklearn.pipeline.Pipeline( [ ('inputer', sklearn.impute.SimpleImputer(strategy='mean')), # ('scale', sklearn.preprocessing.StandardScaler), ('est', est_class(**est_params)), ] ) return multiclass_wrapper(pipe) elif est_type == 'Logit': def clf_partial(): pipe = sklearn.pipeline.Pipeline( [ ('inputer', sklearn.impute.SimpleImputer(strategy='mean')), ('est', est_class(**est_params)), ] ) return multiclass_wrapper(pipe) else: def clf_partial(): return multiclass_wrapper(est_class(**est_params)) return clf_partial def _train_deploy_clf(pblm, task_key, data_key, clf_key): X_df = pblm.samples.X_dict[data_key] labels = pblm.samples.subtasks[task_key] assert np.all(labels.encoded_df.index == X_df.index) clf_partial = pblm._get_estimator(clf_key) logger.info( 'Training deployment {} classifier on {} for {}'.format( clf_key, data_key, task_key ) ) clf = clf_partial() index = X_df.index X = X_df.loc[index].values y = labels.encoded_df.loc[index].values.ravel() clf.fit(X, y) return clf def _optimize_rf_hyperparams(pblm, data_key=None, task_key=None): """ helper script I've only run interactively Example: >>> # DISABLE_DOCTEST >>> from wbia.algo.verif.vsone import * # NOQA >>> pblm = OneVsOneProblem.from_empty('PZ_PB_RF_TRAIN') #>>> pblm = OneVsOneProblem.from_empty('GZ_Master1') >>> pblm.load_samples() >>> pblm.load_features() >>> pblm.build_feature_subsets() >>> data_key=None >>> task_key=None """ from sklearn.model_selection import RandomizedSearchCV # NOQA from sklearn.model_selection import GridSearchCV # NOQA from sklearn.ensemble import RandomForestClassifier from wbia.algo.verif import sklearn_utils if data_key is None: data_key = pblm.default_data_key if task_key is None: task_key = pblm.primary_task_key # Load data X = pblm.samples.X_dict[data_key].values y = pblm.samples.subtasks[task_key].y_enc groups = pblm.samples.group_ids # Define estimator and parameter search space grid = { 'bootstrap': [True, False], 'class_weight': [None, 'balanced'], 'criterion': ['entropy', 'gini'], # 'max_features': ['sqrt', 'log2'], 'max_features': ['sqrt'], 'min_samples_leaf': list(range(2, 11)), 'min_samples_split': list(range(2, 11)), 'n_estimators': [8, 64, 128, 256, 512, 1024], } est = RandomForestClassifier(missing_values=np.nan) if False: # debug params = ut.util_dict.all_dict_combinations(grid)[0] est.set_params(verbose=10, n_jobs=1, **params) est.fit(X=X, y=y) cv = sklearn_utils.StratifiedGroupKFold(n_splits=3) if True: n_iter = 25 SearchCV = ut.partial(RandomizedSearchCV, n_iter=n_iter) else: n_iter = ut.prod(map(len, grid.values())) SearchCV = GridSearchCV search = SearchCV(est, grid, cv=cv, verbose=10) n_cpus = ut.num_cpus() thresh = n_cpus * 1.5 n_jobs_est = 1 n_jobs_ser = min(n_cpus, n_iter) if n_iter < thresh: n_jobs_est = int(max(1, thresh / n_iter)) est.set_params(n_jobs=n_jobs_est) search.set_params(n_jobs=n_jobs_ser) search.fit(X=X, y=y, groups=groups) res = search.cv_results_.copy() alias = ut.odict( [ ('rank_test_score', 'rank'), ('mean_test_score', 'μ-test'), ('std_test_score', 'σ-test'), ('mean_train_score', 'μ-train'), ('std_train_score', 'σ-train'), ('mean_fit_time', 'fit_time'), ('params', 'params'), ] ) res = ut.dict_subset(res, alias.keys()) cvresult_df = pd.DataFrame(res).rename(columns=alias) cvresult_df = cvresult_df.sort_values('rank').reset_index(drop=True) params = pd.DataFrame.from_dict(cvresult_df['params'].values.tolist()) logger.info('Varied params:') logger.info(ut.repr4(ut.map_vals(set, params.to_dict('list')))) logger.info('Ranked Params') logger.info(params) logger.info('Ranked scores on development set:') logger.info(cvresult_df) logger.info('Best parameters set found on hyperparam set:') logger.info('best_params_ = %s' % (ut.repr4(search.best_params_),)) logger.info('Fastest params') cvresult_df.loc[cvresult_df['fit_time'].idxmin()]['params'] def _dev_calib(pblm): """ interactive script only """ from sklearn.ensemble import RandomForestClassifier from sklearn.calibration import CalibratedClassifierCV from sklearn.calibration import calibration_curve from sklearn.metrics import log_loss, brier_score_loss # Load data data_key = pblm.default_data_key task_key = pblm.primary_task_key X = pblm.samples.X_dict[data_key].values y = pblm.samples.subtasks[task_key].y_enc groups = pblm.samples.group_ids # Split into test/train/valid cv = sklearn_utils.StratifiedGroupKFold(n_splits=2) test_idx, train_idx = next(cv.split(X, y, groups)) # valid_idx = train_idx[0::2] # train_idx = train_idx[1::2] # train_valid_idx = np.hstack([train_idx, valid_idx]) # Train Uncalibrated RF est_kw = pblm._estimator_params('RF')[0] uncal_clf = RandomForestClassifier(**est_kw) uncal_clf.fit(X[train_idx], y[train_idx]) uncal_probs = uncal_clf.predict_proba(X[test_idx]).T[1] uncal_score = log_loss(y[test_idx] == 1, uncal_probs) uncal_brier = brier_score_loss(y[test_idx] == 1, uncal_probs) # Train Calibrated RF method = 'isotonic' if len(test_idx) > 2000 else 'sigmoid' precal_clf = RandomForestClassifier(**est_kw) # cv = sklearn_utils.StratifiedGroupKFold(n_splits=3) cal_clf = CalibratedClassifierCV(precal_clf, cv=2, method=method) cal_clf.fit(X[train_idx], y[train_idx]) cal_probs = cal_clf.predict_proba(X[test_idx]).T[1] cal_score = log_loss(y[test_idx] == 1, cal_probs) cal_brier = brier_score_loss(y[test_idx] == 1, cal_probs) logger.info('cal_brier = %r' % (cal_brier,)) logger.info('uncal_brier = %r' % (uncal_brier,)) logger.info('uncal_score = %r' % (uncal_score,)) logger.info('cal_score = %r' % (cal_score,)) import wbia.plottool as pt ut.qtensure() pt.figure() ax = pt.gca() y_test = y[test_idx] == 1 fraction_of_positives, mean_predicted_value = calibration_curve( y_test, uncal_probs, n_bins=10 ) ax.plot([0, 1], [0, 1], 'k:', label='Perfectly calibrated') ax.plot( mean_predicted_value, fraction_of_positives, 's-', label='%s (%1.3f)' % ('uncal-RF', uncal_brier), ) fraction_of_positives, mean_predicted_value = calibration_curve( y_test, cal_probs, n_bins=10 ) ax.plot( mean_predicted_value, fraction_of_positives, 's-', label='%s (%1.3f)' % ('cal-RF', cal_brier), ) pt.legend()
[docs]@ut.reloadable_class class ClfResult(ut.NiceRepr): r""" Handles evaluation statistics for a multiclass classifier trained on a specific dataset with specific labels. """ # Attributes that identify the task and data the classifier is evaluated on _key_attrs = ['task_key', 'data_key', 'class_names'] # Attributes about results and labels of individual samples _datafame_attrs = ['probs_df', 'probhats_df', 'target_bin_df', 'target_enc_df'] def __init__(res): pass def __nice__(res): return '{}, {}, {}'.format(res.task_key, res.data_key, len(res.index)) @property def index(res): return res.probs_df.index
[docs] @classmethod def make_single(ClfResult, clf, X_df, test_idx, labels, data_key, feat_dims=None): """ Make a result for a single cross validiation subset """ X_df_test = X_df.iloc[test_idx] if feat_dims is not None: X_df_test = X_df_test[feat_dims] index = X_df_test.index # clf_probs = clf.predict_proba(X_df_test) # index = pd.Series(test_idx, name='test_idx') # Ensure shape corresponds with all classes def align_cols(arr, arr_cols, target_cols): import utool as ut alignx = ut.list_alignment(arr_cols, target_cols, missing=True) aligned_arrT = ut.none_take(arr.T, alignx) aligned_arrT = ut.replace_nones(aligned_arrT, np.zeros(len(arr))) aligned_arr = np.vstack(aligned_arrT).T return aligned_arr res = ClfResult() res.task_key = labels.task_name res.data_key = data_key res.class_names = ut.lmap(str, labels.class_names) res.feat_dims = feat_dims res.probs_df = sklearn_utils.predict_proba_df(clf, X_df_test, res.class_names) res.target_bin_df = labels.indicator_df.iloc[test_idx] res.target_enc_df = labels.encoded_df.iloc[test_idx] if hasattr(clf, 'estimators_') and labels.n_classes > 2: # The n-th estimator in the OVR classifier predicts the prob of the # n-th class (as label 1). probs_hat = np.hstack( [est.predict_proba(X_df_test)[:, 1:2] for est in clf.estimators_] ) res.probhats_df = pd.DataFrame( align_cols(probs_hat, clf.classes_, labels.classes_), index=index, columns=res.class_names, ) # In the OVR-case, ideally things will sum to 1, but when they # don't normalization happens. An Z-value of more than 1 means # overconfidence, and under 0 means underconfidence. res.confidence_ratio = res.probhats_df.sum(axis=1) else: res.probhats_df = None return res
[docs] def compress(res, flags): res2 = ClfResult() res2.task_key = res.task_key res2.data_key = res.data_key res2.class_names = res.class_names res2.probs_df = res.probs_df[flags] res2.target_bin_df = res.target_bin_df[flags] res2.target_enc_df = res.target_enc_df[flags] if res.probhats_df is None: res2.probhats_df = None else: res2.probhats_df = res.probhats_df[flags] # res2.confidence_ratio = res.confidence_ratio[flags] return res2
[docs] @classmethod def combine_results(ClfResult, res_list, labels=None): """ Combine results from cross validation runs into a single result representing the performance of the entire dataset """ # Ensure that res_lists are not overlapping for r1, r2 in ut.combinations(res_list, 2): assert ( len(r1.index.intersection(r2.index)) == 0 ), 'ClfResult dataframes must be disjoint' # sanity check for r in res_list: assert np.all(r.index == r.probs_df.index) assert np.all(r.index == r.target_bin_df.index) assert np.all(r.index == r.target_enc_df.index) # Combine them with pandas res = ClfResult() res0 = res_list[0] # Transfer single attributes (which should all be the same) for attr in ClfResult._key_attrs: val = getattr(res0, attr) setattr(res, attr, val) assert all( [getattr(r, attr) == val for r in res_list] ), 'ClfResult with different key attributes are incompatible' # Combine dataframe properties (which should all have disjoint indices) for attr in ClfResult._datafame_attrs: if getattr(res0, attr) is not None: combo_attr = pd.concat([getattr(r, attr) for r in res_list]) setattr(res, attr, combo_attr) else: setattr(res, attr, None) for attr in ClfResult._datafame_attrs: val = getattr(res, attr) if val is not None: assert np.all(res.index == val.index), 'index got weird' return res
[docs] def hardness_analysis(res, samples, infr=None, method='argmax'): """ samples = pblm.samples # TODO MWE with sklearn data # ClfResult.make_single(ClfResult, clf, X_df, test_idx, labels, # data_key, feat_dims=None): import sklearn.datasets iris = sklearn.datasets.load_iris() # TODO: make this setup simpler pblm = ClfProblem() task_key, clf_key, data_key = 'iris', 'RF', 'learn(all)' X_df = pd.DataFrame(iris.data, columns=iris.feature_names) samples = MultiTaskSamples(X_df.index) samples.apply_indicators({'iris': {name: iris.target == idx for idx, name in enumerate(iris.target_names)}}) samples.X_dict = {'learn(all)': X_df} pblm.samples = samples pblm.xval_kw['type'] = 'StratifiedKFold' clf_list, res_list = pblm._train_evaluation_clf( task_key, data_key, clf_key) labels = pblm.samples.subtasks[task_key] res = ClfResult.combine_results(res_list, labels) res.get_thresholds('mcc', 'maximize') predict_method = 'argmax' """ meta = {} easiness = ut.ziptake(res.probs_df.values, res.target_enc_df.values) # pred = sklearn_utils.predict_from_probs(res.probs_df, predict_method) if method == 'max-mcc': method = res.get_thresholds('mcc', 'maximize') pred = sklearn_utils.predict_from_probs(res.probs_df, method, force=True) meta['easiness'] = np.array(easiness).ravel() meta['hardness'] = 1 - meta['easiness'] meta['aid1'] = res.probs_df.index.get_level_values(0) meta['aid2'] = res.probs_df.index.get_level_values(1) # meta['aid1'] = samples.aid_pairs.T[0].take(res.probs_df.index.values) # meta['aid2'] = samples.aid_pairs.T[1].take(res.probs_df.index.values) # meta['pred'] = res.probs_df.values.argmax(axis=1) meta['pred'] = pred.values meta['real'] = res.target_enc_df.values.ravel() meta['failed'] = meta['pred'] != meta['real'] meta = pd.DataFrame(meta) meta = meta.set_index(['aid1', 'aid2'], drop=False) if infr is not None: ibs = infr.ibs edges = list(meta.index.tolist()) conf_dict = infr.get_edge_attrs( 'confidence', edges, on_missing='filter', default=ibs.const.CONFIDENCE.CODE.UNKNOWN, ) conf_df = pd.DataFrame.from_dict(conf_dict, orient='index') conf_df = conf_df[0].map(ibs.const.CONFIDENCE.CODE_TO_INT) meta = meta.assign(real_conf=conf_df) meta['real_conf'] = np.nan_to_num(meta['real_conf']).astype(np.int) meta = meta.sort_values('hardness', ascending=False) res.meta = meta return res.meta
[docs] def missing_classes(res): # Find classes that were never predicted unique_predictions = np.unique(res.probs_df.values.argmax(axis=1)) n_classes = len(res.class_names) missing_classes = ut.index_complement(unique_predictions, n_classes) return missing_classes
[docs] def augment_if_needed(res): """ Adds in dummy values for missing classes """ missing_classes = res.missing_classes() n_classes = len(res.class_names) y_test_enc_aug = res.target_enc_df.values y_test_bin_aug = res.target_bin_df.values clf_probs_aug = res.probs_df.values sample_weight = np.ones(len(y_test_enc_aug)) n_missing = len(missing_classes) if res.probhats_df is not None: clf_probhats_aug = res.probhats_df.values else: clf_probhats_aug = None # Check if augmentation is necessary if n_missing > 0: missing_bin = np.zeros((n_missing, n_classes)) missing_bin[(np.arange(n_missing), missing_classes)] = 1.0 missing_enc = np.array(missing_classes)[:, None] y_test_enc_aug = np.vstack([y_test_enc_aug, missing_enc]) y_test_bin_aug = np.vstack([y_test_bin_aug, missing_bin]) clf_probs_aug = np.vstack([clf_probs_aug, missing_bin]) # make sample weights where dummies have no weight sample_weight = np.hstack([sample_weight, np.full(n_missing, 0)]) if res.probhats_df is not None: clf_probhats_aug = np.vstack([clf_probhats_aug, missing_bin]) res.clf_probs = clf_probs_aug res.clf_probhats = clf_probhats_aug res.y_test_enc = y_test_enc_aug res.y_test_bin = y_test_bin_aug res.sample_weight = sample_weight
[docs] def extended_clf_report(res, verbose=True): res.augment_if_needed() pred_enc = res.clf_probs.argmax(axis=1) y_pred = pred_enc y_true = res.y_test_enc sample_weight = res.sample_weight target_names = res.class_names report = sklearn_utils.classification_report2( y_true, y_pred, target_names=target_names, sample_weight=sample_weight, verbose=verbose, ) return report
[docs] def print_report(res): res.augment_if_needed() pred_enc = res.clf_probs.argmax(axis=1) res.extended_clf_report() report = sklearn.metrics.classification_report( y_true=res.y_test_enc, y_pred=pred_enc, target_names=res.class_names, sample_weight=res.sample_weight, ) logger.info('Precision/Recall Report:') logger.info(report)
[docs] def get_thresholds(res, metric='mcc', value='maximize'): """ get_metric = 'thresholds' at_metric = metric = 'mcc' at_value = value = 'maximize' a = [] b = [] for x in np.linspace(0, 1, 1000): a += [cfms.get_metric_at_metric('thresholds', 'fpr', x, subindex=True)] b += [cfms.get_thresh_at_metric('fpr', x)] a = np.array(a) b = np.array(b) d = (a - b) logger.info((d.min(), d.max())) """ threshes = {} for class_name in res.class_names: cfms = res.confusions(class_name) thresh = cfms.get_metric_at_metric('thresh', metric, value) threshes[class_name] = thresh return threshes
[docs] @profile def get_pos_threshes( res, metric='fpr', value=1e-4, maximize=False, warmup=200, priors=None, min_thresh=0.5, ): """ Finds a threshold that achieves the desired `value` for the desired metric, while maximizing or minimizing the threshold. For positive classification you want to minimize the threshold. Priors can be passed in to augment probabilities depending on support. By default a class prior is 1 for threshold minimization and 0 for maximization. """ pos_threshes = {} if priors is None: priors = {name: float(not maximize) for name in res.class_names} for class_name in res.class_names: cfms = res.confusions(class_name) learned_thresh = cfms.get_metric_at_metric('thresh', metric, value) # learned_thresh = cfms.get_thresh_at_metric( # metric, value, maximize=maximize) prior_thresh = priors[class_name] n_support = cfms.n_pos if warmup is not None: """ python -m wbia.plottool.draw_func2 plot_func --show --range=0,1 \ --func="lambda x: np.maximum(0, (x - .6) / (1 - .6))" """ # If n_support < warmup: then interpolate to learned thresh nmax = warmup if isinstance(warmup, int) else warmup[class_name] # alpha varies from 0 to 1 alpha = min(nmax, n_support) / nmax # transform alpha through nonlinear function (similar to ReLU) p = 0.6 # transition point alpha = max(0, (alpha - p) / (1 - p)) thresh = prior_thresh * (1 - alpha) + learned_thresh * (alpha) else: thresh = learned_thresh pos_threshes[class_name] = max(min_thresh, thresh) return pos_threshes
[docs] def report_thresholds(res, warmup=200): # import vtool as vt ut.cprint('Threshold Report', 'yellow') y_test_bin = res.target_bin_df.values # y_test_enc = y_test_bin.argmax(axis=1) # clf_probs = res.probs_df.values # The maximum allowed false positive rate # We expect that we will make 1 error every 1,000 decisions # thresh_df['foo'] = [1, 2, 3] # thresh_df['foo'][res.class_names[k]] = 1 # for k in [2, 0, 1]: choice_mv = ut.odict( [ ('@fpr=.01', ('fpr', 0.01)), ('@fpr=.001', ('fpr', 0.001)), ('@fpr=.0001', ('fpr', 1e-4)), ('@fpr=.0000', ('fpr', 0)), ('@max(mcc)', ('mcc', 'max')), # (class_name + '@max(acc)', ('acc', 'max')), # (class_name + '@max(mk)', ('mk', 'max')), # (class_name + '@max(bm)', ('bm', 'max')), ] ) for k in range(y_test_bin.shape[1]): thresh_dict = ut.odict() class_name = res.class_names[k] cfms = res.confusions(class_name) # probs, labels = clf_probs.T[k], y_test_bin.T[k] # cfms = vt.ConfusionMetrics().fit(probs, labels) for k, mv in choice_mv.items(): metric, value = mv idx = cfms.get_index_at_metric(metric, value) key = class_name + k thresh_dict[key] = ut.odict() for metric in ['thresh', 'fpr', 'tpr', 'tpa', 'bm', 'mk', 'mcc']: thresh_dict[key][metric] = cfms.get_metric_at_index(metric, idx) thresh_df = pd.DataFrame.from_dict(thresh_dict, orient='index') thresh_df = thresh_df.loc[list(thresh_dict.keys())] if cfms.n_pos > 0 and cfms.n_neg > 0: logger.info('Raw 1vR {} Thresholds'.format(class_name)) logger.info(ut.indent(thresh_df.to_string(float_format='{:.4f}'.format))) # chosen_type = class_name + '@fpr=0' # pos_threshes[class_name] = thresh_df.loc[chosen_type]['thresh'] for choice_k, choice_mv in iter(choice_mv.items()): metric, value = choice_mv pos_threshes = res.get_pos_threshes(metric, value, warmup=warmup) logger.info('Choosing threshold based on %s' % (choice_k,)) res.report_auto_thresholds(pos_threshes)
[docs] def report_auto_thresholds(res, threshes, verbose=True): report_lines = [] print_ = report_lines.append print_( 'Chosen thresholds = %s' % (ut.repr2(threshes, nl=1, precision=4, align=True),) ) res.augment_if_needed() target_names = res.class_names sample_weight = res.sample_weight y_true = res.y_test_enc.ravel() y_pred, can_autodecide = sklearn_utils.predict_from_probs( res.clf_probs, threshes, res.class_names, force=False, multi=False, return_flags=True, ) can_autodecide[res.sample_weight == 0] = False auto_pred = y_pred[can_autodecide].astype(np.int) auto_true = y_true[can_autodecide].ravel() auto_probs = res.clf_probs[can_autodecide] total_cases = int(sample_weight.sum()) print_('Will autodecide for %r/%r cases' % (can_autodecide.sum(), (total_cases))) def frac_str(a, b): return '{:}/{:} = {:.2f}%'.format(int(a), int(b), a / b) y_test_bin = res.target_bin_df.values supported_class_idxs = [k for k, y in enumerate(y_test_bin.T) if y.sum() > 0] print_(' * Auto-Decide Per-Class Summary') for k in supported_class_idxs: # Look at fail/succs in threshold name = res.class_names[k] # number of times this class appears overall n_total_k = (y_test_bin.T[k]).sum() # get the cases where this class was predicted auto_true_k = auto_true == k auto_pred_k = auto_pred == k # number of cases auto predicted n_pred_k = auto_pred_k.sum() # number of times auto was right n_tp = (auto_true_k & auto_pred_k).sum() # number of times auto was wrong n_fp = (~auto_true_k & auto_pred_k).sum() fail_str = frac_str(n_fp, n_pred_k) pass_str = frac_str(n_tp, n_total_k) fmtstr = '\n'.join( [ '{name}:', ' {n_total_k} samples existed, and did {n_pred_k} auto predictions', ' got {pass_str} right', ' made {fail_str} errors', ] ) print_(ut.indent(fmtstr.format(**locals()))) report = sklearn_utils.classification_report2( y_true, y_pred, target_names=target_names, sample_weight=can_autodecide.astype(np.float), verbose=False, ) print_(' * Auto-Decide Confusion') print_(ut.indent(str(report['confusion']))) print_(' * Auto-Decide Metrics') print_(ut.indent(str(report['metrics']))) if 'mcc' in report: print_(ut.indent(str(report['mcc']))) try: auto_truth_bin = res.y_test_bin[can_autodecide] for k in supported_class_idxs: auto_truth_k = auto_truth_bin.T[k] auto_probs_k = auto_probs.T[k] if auto_probs_k.sum(): auc = sklearn.metrics.roc_auc_score(auto_truth_k, auto_probs_k) print_( ' * Auto AUC(Macro): {:.4f} for class={}'.format( auc, res.class_names[k] ) ) except ValueError: pass report = '\n'.join(report_lines) if verbose: logger.info(report) return report
[docs] def confusions(res, class_name): import vtool as vt y_test_bin = res.target_bin_df.values clf_probs = res.probs_df.values k = res.class_names.index(class_name) probs, labels = clf_probs.T[k], y_test_bin.T[k] confusions = vt.ConfusionMetrics().fit(probs, labels) return confusions
[docs] def ishow_roc(res): import vtool as vt import wbia.plottool as pt ut.qtensure() y_test_bin = res.target_bin_df.values # The maximum allowed false positive rate # We expect that we will make 1 error every 1,000 decisions # thresh_df['foo'] = [1, 2, 3] # thresh_df['foo'][res.class_names[k]] = 1 # for k in [2, 0, 1]: for k in range(y_test_bin.shape[1]): if y_test_bin.shape[1] == 2 and k == 0: # only show one in the binary case continue class_name = res.class_names[k] confusions = res.confusions(class_name) ROCInteraction = vt.interact_roc_factory( confusions, show_operating_point=True ) fnum = pt.ensure_fnum(k) # ROCInteraction.static_plot(fnum, None, name=class_name) inter = ROCInteraction(fnum=fnum, pnum=None, name=class_name) inter.start() # if False: # X = probs # y = labels # encoder = vt.ScoreNormalizer() # encoder.fit(probs, labels) # learn_thresh = encoder.learn_threshold2() # encoder.inverse_normalize(learn_thresh) # encoder.visualize(fnum=k) pass
[docs] def show_roc(res, class_name, **kwargs): import vtool as vt labels = res.target_bin_df[class_name].values probs = res.probs_df[class_name].values confusions = vt.ConfusionMetrics().fit(probs, labels) confusions.draw_roc_curve(**kwargs)
[docs] def roc_scores_ovr_hat(res): res.augment_if_needed() for k in range(len(res.class_names)): class_k_truth = res.y_test_bin.T[k] class_k_probs = res.probhats_df.values.T[k] auc = sklearn.metrics.roc_auc_score(class_k_truth, class_k_probs) yield auc
[docs] def roc_scores_ovr(res): res.augment_if_needed() for k in range(res.y_test_bin.shape[1]): class_k_truth = res.y_test_bin.T[k] class_k_probs = res.clf_probs.T[k] auc = sklearn.metrics.roc_auc_score(class_k_truth, class_k_probs) yield auc
[docs] def confusions_ovr(res): # one_vs_rest confusions import vtool as vt res.augment_if_needed() for k in range(res.y_test_bin.shape[1]): class_k_truth = res.y_test_bin.T[k] class_k_probs = res.clf_probs.T[k] cfms = vt.ConfusionMetrics().fit(class_k_probs, class_k_truth) # auc = sklearn.metrics.roc_auc_score(class_k_truth, class_k_probs) yield res.class_names[k], cfms
[docs] def roc_score(res): res.augment_if_needed() auc_learn = sklearn.metrics.roc_auc_score(res.y_test_bin, res.clf_probs) return auc_learn
[docs]@ut.reloadable_class class MultiTaskSamples(ut.NiceRepr): """ Handles samples (i.e. feature-label pairs) with a combination of non-mutually exclusive subclassification labels CommandLine: python -m wbia.algo.verif.clf_helpers MultiTaskSamples Example: >>> # ENABLE_DOCTEST >>> from wbia.algo.verif.clf_helpers import * # NOQA >>> samples = MultiTaskSamples([0, 1, 2, 3]) >>> tasks_to_indicators = ut.odict([ >>> ('task1', ut.odict([ >>> ('state1', [0, 0, 0, 1]), >>> ('state2', [0, 0, 1, 0]), >>> ('state3', [1, 1, 0, 0]), >>> ])), >>> ('task2', ut.odict([ >>> ('state4', [0, 0, 0, 1]), >>> ('state5', [1, 1, 1, 0]), >>> ])) >>> ]) >>> samples.apply_indicators(tasks_to_indicators) """ def __init__(samples, index): samples.index = index samples.subtasks = ut.odict() # def set_simple_scores(samples, simple_scores): # if simple_scores is not None: # edges = ut.emap(tuple, samples.aid_pairs.tolist()) # assert (edges == simple_scores.index.tolist()) # samples.simple_scores = simple_scores # def set_feats(samples, X_dict): # if X_dict is not None: # edges = ut.emap(tuple, samples.aid_pairs.tolist()) # for X in X_dict.values(): # assert np.all(edges == X.index.tolist()) # samples.X_dict = X_dict
[docs] def supported_tasks(samples): for task_key, labels in samples.subtasks.items(): labels = samples.subtasks[task_key] if labels.has_support(): yield task_key
[docs] def apply_indicators(samples, tasks_to_indicators): """ Adds labels for a specific task Args: tasks_to_indicators (dict): takes the form: { `my_task_name1' { 'class1': [list of bools indicating class membership] ... 'classN': [list of bools indicating class membership] } ... `my_task_nameN': ... } """ n_samples = None samples.n_tasks = len(tasks_to_indicators) for task_name, indicator in tasks_to_indicators.items(): labels = MultiClassLabels.from_indicators( indicator, task_name=task_name, index=samples.index ) samples.subtasks[task_name] = labels if n_samples is None: n_samples = labels.n_samples elif n_samples != labels.n_samples: raise ValueError('numer of samples is different') samples.n_samples = n_samples
[docs] def apply_encoded_labels(samples, y_enc, class_names, task_name): """ Adds labels for a specific task. Alternative to `apply_indicators` Args: y_enc (list): integer label indicating the class for each sample class_names (list): list of strings indicating the class-domain task_name (str): key for denoting this specific task """ # convert to indicator structure and use that tasks_to_indicators = ut.odict( [ ( task_name, ut.odict( [ (name, np.array(y_enc) == i) for i, name in enumerate(class_names) ] ), ) ] ) samples.apply_indicators(tasks_to_indicators)
# @ut.memoize
[docs] def encoded_2d(samples): encoded_2d = pd.concat([v.encoded_df for k, v in samples.items()], axis=1) return encoded_2d
[docs] def class_name_basis(samples): """corresponds with indexes returned from encoded1d""" class_name_basis = [ t[::-1] for t in ut.product(*[v.class_names for k, v in samples.items()][::-1]) ] # class_name_basis = [(b, a) for a, b in ut.product(*[ # v.class_names for k, v in samples.items()][::-1])] return class_name_basis
[docs] def class_idx_basis_2d(samples): """2d-index version of class_name_basis""" class_idx_basis_2d = [ (b, a) for a, b in ut.product( *[range(v.n_classes) for k, v in samples.items()][::-1] ) ] return class_idx_basis_2d
[docs] def class_idx_basis_1d(samples): """1d-index version of class_name_basis""" n_states = np.prod([v.n_classes for k, v in samples.items()]) class_idx_basis_1d = np.arange(n_states, dtype=np.int) return class_idx_basis_1d
# @ut.memoize
[docs] def encoded_1d(samples): """Returns a unique label for each combination of samples""" # from sklearn.preprocessing import MultiLabelBinarizer encoded_2d = samples.encoded_2d() class_space = [v.n_classes for k, v in samples.items()] offsets = np.array([1] + np.cumprod(class_space).tolist()[:-1])[None, :] encoded_1d = (offsets * encoded_2d).sum(axis=1) # e = MultiLabelBinarizer() # bin_coeff = e.fit_transform(encoded_2d) # bin_basis = (2 ** np.arange(bin_coeff.shape[1]))[None, :] # # encoded_1d = (bin_coeff * bin_basis).sum(axis=1) # encoded_1d = (bin_coeff * bin_basis[::-1]).sum(axis=1) # # vt.unique_rows(sklearn.preprocessing.MultiLabelBinarizer().fit_transform(encoded_2d)) # [v.encoded_df.values for k, v in samples.items()] # encoded_df_1d = pd.concat([v.encoded_df for k, v in samples.items()], axis=1) return encoded_1d
def __nice__(samples): return 'nS=%r, nT=%r' % (len(samples), samples.n_tasks) def __getitem__(samples, task_key): return samples.subtasks[task_key] def __len__(samples): return samples.n_samples
[docs] def print_info(samples): for task_name, labels in samples.items(): labels.print_info() logger.info('hist(all) = %s' % (ut.repr4(samples.make_histogram()))) logger.info('len(all) = %s' % (len(samples)))
[docs] def make_histogram(samples): """label histogram""" class_name_basis = samples.class_name_basis() class_idx_basis_1d = samples.class_idx_basis_1d() # logger.info('class_idx_basis_1d = %r' % (class_idx_basis_1d,)) # logger.info(samples.encoded_1d()) multi_task_idx_hist = ut.dict_hist( samples.encoded_1d().values, labels=class_idx_basis_1d ) multi_task_hist = ut.map_keys(lambda k: class_name_basis[k], multi_task_idx_hist) return multi_task_hist
[docs] def items(samples): for task_name, labels in samples.subtasks.items(): yield task_name, labels
# def take(samples, idxs): # mask = ut.index_to_boolmask(idxs, len(samples)) # return samples.compress(mask) @property def group_ids(samples): return None
[docs] def stratified_kfold_indices(samples, **xval_kw): """ TODO: check xval label frequency """ from sklearn import model_selection X = np.empty((len(samples), 0)) y = samples.encoded_1d().values groups = samples.group_ids type_ = xval_kw.pop('type', 'StratifiedGroupKFold') if type_ == 'StratifiedGroupKFold': assert groups is not None # FIXME: The StratifiedGroupKFold could be implemented better. splitter = sklearn_utils.StratifiedGroupKFold(**xval_kw) skf_list = list(splitter.split(X=X, y=y, groups=groups)) elif type_ == 'StratifiedKFold': splitter = model_selection.StratifiedKFold(**xval_kw) skf_list = list(splitter.split(X=X, y=y)) return skf_list
[docs] def subsplit_indices(samples, subset_idx, **xval_kw): """split an existing set""" from sklearn import model_selection X = np.empty((len(subset_idx), 0)) y = samples.encoded_1d().values[subset_idx] groups = samples.group_ids[subset_idx] xval_kw_ = xval_kw.copy() if 'n_splits' not in xval_kw_: xval_kw_['n_splits'] = 3 type_ = xval_kw_.pop('type', 'StratifiedGroupKFold') if type_ == 'StratifiedGroupKFold': assert groups is not None # FIXME: The StratifiedGroupKFold could be implemented better. splitter = sklearn_utils.StratifiedGroupKFold(**xval_kw_) rel_skf_list = list(splitter.split(X=X, y=y, groups=groups)) elif type_ == 'StratifiedKFold': splitter = model_selection.StratifiedKFold(**xval_kw_) rel_skf_list = list(splitter.split(X=X, y=y)) # map back into original coords skf_list = [ (subset_idx[rel_idx1], subset_idx[rel_idx2]) for rel_idx1, rel_idx2 in rel_skf_list ] for idx1, idx2 in skf_list: assert len(np.intersect1d(subset_idx, idx1)) == len(idx1) assert len(np.intersect1d(subset_idx, idx2)) == len(idx2) # assert return skf_list
[docs]@ut.reloadable_class class MultiClassLabels(ut.NiceRepr): """ Used by samples to encode a single set of mutually exclusive labels. These can either be binary or multiclass. import pandas as pd pd.options.display.max_rows = 10 # pd.options.display.max_rows = 20 pd.options.display.max_columns = 40 pd.options.display.width = 160 """ def __init__(labels): # Helper Info labels.task_name = None labels.n_samples = None labels.n_classes = None labels.class_names = None labels.classes_ = None # Core data labels.indicator_df = None labels.encoded_df = None labels.default_class = None
[docs] def has_support(labels): return len(labels.make_histogram()) > 1
[docs] def lookup_class_idx(labels, class_name): return ut.dzip(labels.class_names, labels.classes_)[class_name]
[docs] @classmethod def from_indicators(MultiClassLabels, indicator, index=None, task_name=None): labels = MultiClassLabels() n_samples = len(next(iter(indicator.values()))) # if index is None: # index = pd.Series(np.arange(n_samples), name='index') indicator_df = pd.DataFrame(indicator, index=index) assert np.all( indicator_df.sum(axis=1).values ), 'states in the same task must be mutually exclusive' labels.indicator_df = indicator_df labels.class_names = indicator_df.columns.values labels.encoded_df = pd.DataFrame( indicator_df.values.argmax(axis=1), columns=[task_name], index=index ) labels.task_name = task_name labels.n_samples = n_samples labels.n_classes = len(labels.class_names) if labels.n_classes == 1: labels.n_classes = 2 # 1 column means binary case labels.classes_ = np.arange(labels.n_classes) labels.default_class_name = labels.class_names[1] return labels
@property def target_type(labels): return sklearn.utils.multiclass.type_of_target(labels.y_enc)
[docs] def one_vs_rest_task_names(labels): return [ labels.task_name + '(' + labels.class_names[k] + '-v-rest)' for k in range(labels.n_classes) ]
[docs] def gen_one_vs_rest_labels(labels): """ Example: >>> # ENABLE_DOCTEST >>> from wbia.algo.verif.clf_helpers import * # NOQA >>> indicator = ut.odict([ >>> ('state1', [0, 0, 0, 1]), >>> ('state2', [0, 0, 1, 0]), >>> ('state3', [1, 1, 0, 0]), >>> ]) >>> labels = MultiClassLabels.from_indicators(indicator, task_name='task1') >>> sublabels = list(labels.gen_one_vs_rest_labels()) >>> sublabel = sublabels[0] """ if labels.target_type == 'binary': yield labels return task_names_1vR = labels.one_vs_rest_task_names() for k in range(labels.n_classes): class_name = labels.class_names[k] task_name = task_names_1vR[k] index = labels.indicator_df.index indicator_df = pd.DataFrame() indicator_df['not-' + class_name] = 1 - labels.indicator_df[class_name] indicator_df[class_name] = labels.indicator_df[class_name] indicator_df.index = index # indicator = labels.encoded_df == k # indicator.rename(columns={indicator.columns[0]: class_name}, inplace=True) n_samples = len(indicator_df) sublabel = MultiClassLabels() sublabel.indicator_df = indicator_df sublabel.class_names = indicator_df.columns.values # if len(indicator_df.columns) == 1: # sublabel.encoded_df = pd.DataFrame( # indicator_df.values.T[0], # columns=[task_name] # ) # else: sublabel.encoded_df = pd.DataFrame( indicator_df.values.argmax(axis=1), columns=[task_name], index=index ) sublabel.task_name = task_name sublabel.n_samples = n_samples sublabel.n_classes = len(sublabel.class_names) # if sublabel.n_classes == 1: # sublabel.n_classes = 2 # 1 column means binary case sublabel.classes_ = np.arange(sublabel.n_classes) # sublabel = MultiClassLabels.from_indicators(indicator, # task_name=subname, index=samples.index) yield sublabel
@property def y_bin(labels): return labels.indicator_df.values @property def y_enc(labels): return labels.encoded_df.values.ravel() def __nice__(labels): parts = [] if labels.task_name is not None: parts.append(labels.task_name) parts.append('nD=%r' % (labels.n_samples)) parts.append('nC=%r' % (labels.n_classes)) return ' '.join(parts) def __len__(labels): return labels.n_samples
[docs] def make_histogram(labels): class_idx_hist = ut.dict_hist(labels.y_enc) class_hist = ut.map_keys(lambda idx: labels.class_names[idx], class_idx_hist) return class_hist
[docs] def print_info(labels): logger.info( 'hist(%s) = %s' % (labels.task_name, ut.repr4(labels.make_histogram())) ) logger.info('len(%s) = %s' % (labels.task_name, len(labels)))
[docs]class IrisProblem(ClfProblem): """ Simple demo using the abstract clf problem to work on the iris dataset. Example: >>> # ENABLE_DOCTEST >>> from wbia.algo.verif.clf_helpers import * # NOQA >>> pblm = IrisProblem() >>> pblm.setup() >>> pblm.samples """
[docs] def setup(pblm): import sklearn.datasets iris = sklearn.datasets.load_iris() pblm.primary_task_key = 'iris' pblm.default_data_key = 'learn(all)' pblm.default_clf_key = 'RF' X_df = pd.DataFrame(iris.data, columns=iris.feature_names) samples = MultiTaskSamples(X_df.index) samples.apply_indicators( { 'iris': { name: iris.target == idx for idx, name in enumerate(iris.target_names) } } ) samples.X_dict = {'learn(all)': X_df} pblm.samples = samples pblm.xval_kw['type'] = 'StratifiedKFold'