Source code for wbia.algo.graph.mixin_loops

# -*- coding: utf-8 -*-
import logging
import numpy as np
import utool as ut
import ubelt as ub
import pandas as pd
import itertools as it
import wbia.constants as const
from wbia.algo.graph.state import POSTV, NEGTV, INCMP, NULL
from wbia.algo.graph.refresh import RefreshCriteria

print, rrr, profile = ut.inject2(__name__)
logger = logging.getLogger('wbia')


PRINCETON_KAIA_EDGE_LIST = None


[docs]class InfrLoops(object): """ Algorithm control flow loops """
[docs] def main_gen(infr, max_loops=None, use_refresh=True): """ The main outer loop. This function is designed as an iterator that will execute the graph algorithm main loop as automatically as possible, but if user input is needed, it will pause and yield the decision it needs help with. Once feedback is given for this item, you can continue the main loop by calling next. StopIteration is raised once the algorithm is complete. Args: max_loops(int): maximum number of times to run the outer loop, i.e. ranking is run at most this many times. use_refresh(bool): allow the refresh criterion to stop the algo Notes: Different phases of the main loop are implemented as subiterators CommandLine: python -m wbia.algo.graph.mixin_loops main_gen Doctest: >>> # xdoctest: +REQUIRES(--slow) >>> from wbia.algo.graph.mixin_loops import * >>> from wbia.algo.graph.mixin_simulation import UserOracle >>> import wbia >>> infr = wbia.AnnotInference('testdb1', aids='all', >>> autoinit='staging', verbose=4) >>> infr.params['manual.n_peek'] = 10 >>> infr.params['ranking.ntop'] = 1 >>> infr.oracle = UserOracle(.99, rng=0) >>> infr.simulation_mode = False >>> infr.reset() >>> #infr.load_published() >>> gen = infr.main_gen() >>> while True: >>> try: >>> reviews = next(gen) >>> edge, priority, data = reviews[0] >>> feedback = infr.request_oracle_review(edge) >>> infr.add_feedback(edge, **feedback) >>> except StopIteration: >>> break """ infr.print('Starting main loop', 1) infr.print('infr.params = {}'.format(ut.repr3(infr.params))) if max_loops is None: max_loops = infr.params['algo.max_outer_loops'] if max_loops is None: max_loops = np.inf if infr.test_mode: logger.info('------------------ {} -------------------'.format(infr.name)) # Initialize a refresh criteria infr.init_refresh() infr.phase = 0 infr.print('Entering Phase 0', 1, color='red') # Phase 0.1: Ensure the user sees something immediately if infr.params['algo.quickstart']: infr.loop_phase = 'quickstart_init' # quick startup. Yield a bunch of random edges num = infr.params['manual.n_peek'] for edge in ut.random_combinations(infr.aids, 2, num=num): yield infr._make_review_tuple(edge, None) if infr.params['algo.hardcase']: infr.loop_phase = 'hardcase_init' # Check previously labeled edges that where the groundtruth and the # verifier disagree. yield from infr.hardcase_review_gen() if infr.params['inference.enabled']: infr.loop_phase = 'incon_recover_init' # First, fix any inconsistencies yield from infr.incon_recovery_gen() # Phase 0.2: Ensure positive redundancy (this is generally quick) # so the user starts seeing real work after one random review is made # unless the graph is already positive redundant. if infr.params['redun.enabled'] and infr.params['redun.enforce_pos']: infr.loop_phase = 'pos_redun_init' # Fix positive redundancy of anything within the loop yield from infr.pos_redun_gen() infr.phase = 1 infr.print('Entering Phase 1', 1, color='red') if infr.params['ranking.enabled']: for count in it.count(0): infr.print('Outer loop iter %d ' % (count,)) # Phase 1: Try to merge PCCs by searching for LNBNN candidates infr.loop_phase = 'ranking_{}'.format(count) yield from infr.ranked_list_gen(use_refresh) terminate = infr.refresh.num_meaningful == 0 if terminate: infr.print('Triggered break criteria', 1, color='red') # Phase 2: Ensure positive redundancy. infr.phase = 2 infr.print('Entering Phase 2', 1, color='red') infr.loop_phase = 'posredun_{}'.format(count) if all(ut.take(infr.params, ['redun.enabled', 'redun.enforce_pos'])): # Fix positive redundancy of anything within the loop yield from infr.pos_redun_gen() logger.info('prob_any_remain = %r' % (infr.refresh.prob_any_remain(),)) logger.info( 'infr.refresh.num_meaningful = {!r}'.format( infr.refresh.num_meaningful ) ) if (count + 1) >= max_loops: infr.print('early stop', 1, color='red') break if terminate: infr.print('break triggered') break infr.phase = 3 infr.print('Entering Phase 3', 1, color='red') # Phase 0.3: Ensure positive redundancy (this is generally quick) if all(ut.take(infr.params, ['redun.enabled', 'redun.enforce_neg'])): # Phase 3: Try to automatically acheive negative redundancy without # asking the user to do anything but resolve inconsistency. infr.loop_phase = 'negredun' yield from infr.neg_redun_gen() infr.phase = 4 infr.print('Phase 4 - Terminate', 1, color='red') infr.print('Exiting main loop') if infr.params['inference.enabled']: infr.assert_consistency_invariant() return 'finished:main'
[docs] def hardcase_review_gen(infr): """ Subiterator for hardcase review Re-review non-confident edges that vsone did not classify correctly """ infr.print('==============================', color='white') infr.print('--- HARDCASE PRIORITY LOOP ---', color='white') verifiers = infr.learn_evaluation_verifiers() verif = verifiers['match_state'] edges_ = list(infr.edges()) real_ = list(infr.edge_decision_from(edges_)) flags_ = [r in {POSTV, NEGTV, INCMP} for r in real_] real = ut.compress(real_, flags_) edges = ut.compress(edges_, flags_) hardness = 1 - verif.easiness(edges, real) if True: df = pd.DataFrame({'edges': edges, 'real': real}) df['hardness'] = hardness pred = verif.predict(edges) df['pred'] = pred.values df.sort_values('hardness', ascending=False) infr.print('hardness analysis') infr.print(str(df)) infr.print('infr status: ' + ut.repr4(infr.status())) # Don't re-review anything that was confidently reviewed # CONFIDENCE = const.CONFIDENCE # CODE_TO_INT = CONFIDENCE.CODE_TO_INT.copy() # CODE_TO_INT[CONFIDENCE.CODE.UNKNOWN] = 0 # conf = ut.take(CODE_TO_INT, infr.gen_edge_values( # 'confidence', edges, on_missing='default', # default=CONFIDENCE.CODE.UNKNOWN)) # This should only be run with certain params assert not infr.params['autoreview.enabled'] assert not infr.params['redun.enabled'] assert not infr.params['ranking.enabled'] assert infr.params['inference.enabled'] # const.CONFIDENCE.CODE.PRETTY_SURE if infr.params['queue.conf.thresh'] is None: # != 'pretty_sure': infr.print('WARNING: should queue.conf.thresh = "pretty_sure"?') # work around add_candidate_edges infr.prioritize(metric='hardness', edges=edges, scores=hardness) infr.set_edge_attrs('hardness', ut.dzip(edges, hardness)) yield from infr._inner_priority_gen(use_refresh=False)
[docs] def ranked_list_gen(infr, use_refresh=True): """ Subiterator for phase1 of the main algorithm Calls the underlying ranking algorithm and prioritizes the results """ infr.print('============================', color='white') infr.print('--- RANKED LIST LOOP ---', color='white') n_prioritized = infr.refresh_candidate_edges() if n_prioritized == 0: infr.print('RANKING ALGO FOUND NO NEW EDGES') return if use_refresh: infr.refresh.clear() yield from infr._inner_priority_gen(use_refresh)
[docs] def incon_recovery_gen(infr): """ Subiterator for recovery mode of the mainm algorithm Iterates until the graph is consistent Note: inconsistency recovery is implicitly handled by the main algorithm, so other phases do not need to call this explicitly. This exists for the case where the only mode we wish to run is inconsistency recovery. """ maybe_error_edges = list(infr.maybe_error_edges()) if len(maybe_error_edges) == 0: return infr.print('============================', color='white') infr.print('--- INCON RECOVER LOOP ---', color='white') infr.queue.clear() infr.add_candidate_edges(maybe_error_edges) yield from infr._inner_priority_gen(use_refresh=False)
[docs] def pos_redun_gen(infr): """ Subiterator for phase2 of the main algorithm. Searches for decisions that would commplete positive redundancy Doctest: >>> from wbia.algo.graph.mixin_loops import * >>> import wbia >>> infr = wbia.AnnotInference('PZ_MTEST', aids='all', >>> autoinit='staging', verbose=4) >>> #infr.load_published() >>> gen = infr.pos_redun_gen() >>> feedback = next(gen) """ infr.print('===========================', color='white') infr.print('--- POSITIVE REDUN LOOP ---', color='white') # FIXME: should prioritize inconsistentices first count = -1 def thread_gen(): # This is probably not safe new_edges = infr.find_pos_redun_candidate_edges() yield from buffered_add_candidate_edges(infr, 50, new_edges) def serial_gen(): # use this if threading does bad things new_edges = list(infr.find_pos_redun_candidate_edges()) if len(new_edges) > 0: infr.add_candidate_edges(new_edges) yield new_edges def filtered_gen(): # Buffer one-vs-one scores in the background and present an edge to # the user ASAP. # if infr.test_mode: candgen = serial_gen() # else: # candgen = thread_gen() include_filter_set = None if False and PRINCETON_KAIA_EDGE_LIST is not None: # logger.info('[mixin_loops] FILTERING EDGES FOR KAIA') # Sanity check, make sure that one of the edges is in the tier 1 dataset include_filter_set = set(PRINCETON_KAIA_EDGE_LIST) for new_edges in candgen: if False and infr.ibs is not None: ibs = infr.ibs qual_edges = ibs.unflat_map(ibs.get_annot_quality_int, new_edges) valid_edges = [] for (u, v), (q1, q2) in zip(new_edges, qual_edges): # Skip edges involving qualities less than ok if q1 is not None and q1 < ibs.const.QUAL.OK: continue if q2 is not None and q2 < ibs.const.QUAL.OK: continue if include_filter_set is not None: if ( u not in include_filter_set and v not in include_filter_set ): continue valid_edges.append((u, v)) if len(valid_edges) > 0: yield valid_edges else: yield new_edges for count in it.count(0): infr.print('check pos-redun iter {}'.format(count)) infr.queue.clear() # if count > 0 and count % 5 == 0: # nid = infr._reset_inconsistency_reviews() # infr.print( # 'Failed to fix inconsistency, reset reviews for NID = %r' % (nid,) # ) # infr.reset_feedback('staging', apply=True) # infr.ensure_mst() # infr.apply_nondynamic_update() found_any = False try: for new_edges in filtered_gen(): found_any = True yield from infr._inner_priority_gen(use_refresh=False) except (RuntimeError, StopIteration): logger.info( 'StopIteration in pos_redun_gen, found_any = {!r}'.format(found_any) ) break # logger.info('found_any = {!r}'.format(found_any)) if not found_any: break infr.print('not pos-reduntant yet.', color='white') infr.print('pos-redundancy achieved in {} iterations'.format(count + 1))
[docs] def neg_redun_gen(infr): """ Subiterator for phase3 of the main algorithm. Searches for decisions that would commplete negative redundancy """ infr.print('===========================', color='white') infr.print('--- NEGATIVE REDUN LOOP ---', color='white') infr.queue.clear() only_auto = infr.params['redun.neg.only_auto'] # TODO: outer loop that re-iterates until negative redundancy is # accomplished. needs_neg_redun = infr.find_neg_redun_candidate_edges() chunksize = 500 for new_edges in ub.chunks(needs_neg_redun, chunksize): infr.print('another neg redun chunk') # Add chunks in a little at a time for faster response time infr.add_candidate_edges(new_edges) yield from infr._inner_priority_gen(use_refresh=False, only_auto=only_auto)
def _inner_priority_gen(infr, use_refresh=False, only_auto=False): """ Helper function that implements the general inner priority loop. Executes reviews until the queue is empty or needs refresh Args: user_refresh (bool): if True enables the refresh criteria. (set to True in Phase 1) only_auto (bool) if True, then the user wont be prompted with reviews unless the graph is inconsistent. (set to True in Phase 3) Notes: The caller is responsible for populating the priority queue. This will iterate until the queue is empty or the refresh critieron is triggered. """ if infr.refresh: infr.refresh.enabled = use_refresh infr.print('Start inner loop with {} items in the queue'.format(len(infr.queue))) for count in it.count(0): if infr.is_recovering(): infr.print( 'Still recovering after %d iterations' % (count,), 3, color='brightcyan', ) else: # Do not check for refresh if we are recovering if use_refresh and infr.refresh.check(): infr.print( 'Triggered refresh criteria after %d iterations' % (count,), 1, color='yellow', ) break # If the queue is empty break if len(infr.queue) == 0: infr.print( 'No more edges after %d iterations, need refresh' % (count,), 1, color='yellow', ) break # Try to automatically do the next review. try: edge, priority = infr.peek() except TypeError: infr.print( 'Nothing to peek', 1, color='yellow', ) break infr.print('next_review. edge={}'.format(edge), 100) inconsistent = infr.is_recovering(edge) feedback = None if infr.params['autoreview.enabled'] and not inconsistent: # Try to autoreview if we aren't in an inconsistent state feedback = infr.try_auto_review(edge) if feedback is not None: # Add feedback from the automated method infr.add_feedback(edge, priority=priority, **feedback) else: # We can't automatically review, ask for help if only_auto and not inconsistent: # We are in auto only mode, skip manual review # unless there is an inconsistency infr.skip(edge) else: if infr.simulation_mode: # Use oracle feedback feedback = infr.request_oracle_review(edge) infr.add_feedback(edge, priority=priority, **feedback) else: # Yield to the user if we need to pause yield infr.emit_manual_review(edge, priority) if infr.metrics_list: infr._print_previous_loop_statistics(count)
[docs] def init_refresh(infr): refresh_params = infr.subparams('refresh') infr.refresh = RefreshCriteria(**refresh_params)
[docs] def start_id_review(infr, max_loops=None, use_refresh=None): assert infr._gen is None, 'algo already running' # Just exhaust the main generator infr._gen = infr.main_gen(max_loops=max_loops, use_refresh=use_refresh)
# return infr._gen
[docs] def main_loop(infr, max_loops=None, use_refresh=True): """DEPRICATED use list(infr.main_gen) instead or assert not any(infr.main_gen()) maybe this is fine. """ raise RuntimeError() infr.start_id_review(max_loops=max_loops, use_refresh=use_refresh) # To automatically run through the loop just exhaust the generator try: result = next(infr._gen) except StopIteration: pass assert result is None, 'need user interaction. cannot auto loop' infr._gen = None
[docs]class InfrReviewers(object):
[docs] @profile def try_auto_review(infr, edge): review = { 'user_id': 'algo:auto_clf', 'confidence': const.CONFIDENCE.CODE.PRETTY_SURE, 'evidence_decision': None, 'meta_decision': NULL, 'timestamp_s1': None, 'timestamp_c1': None, 'timestamp_c2': None, 'tags': [], } if infr.is_recovering(): # Do not autoreview if we are in an inconsistent state infr.print('Must manually review inconsistent edge', 3) return None # Determine if anything passes the match threshold primary_task = 'match_state' try: decision_probs = infr.task_probs[primary_task][edge] except KeyError: if infr.verifiers is None: return None if infr.verifiers.get(primary_task, None) is None: return None # Compute probs if they haven't been done yet infr.ensure_priority_scores([edge]) try: decision_probs = infr.task_probs[primary_task][edge] except KeyError: return None primary_thresh = infr.task_thresh[primary_task] decision_flags = { k: decision_probs[k] > thresh for k, thresh in primary_thresh.items() } hasone = sum(decision_flags.values()) == 1 auto_flag = False if hasone: try: # Check to see if it might be confounded by a photobomb pb_probs = infr.task_probs['photobomb_state'][edge] # pb_probs = infr.task_probs['photobomb_state'].loc[edge] # pb_probs = data['task_probs']['photobomb_state'] pb_thresh = infr.task_thresh['photobomb_state']['pb'] confounded = pb_probs['pb'] > pb_thresh except KeyError: logger.info('Warning: confounding task probs not set (i.e. photobombs)') confounded = False if not confounded: # decision = decision_flags.argmax() evidence_decision = ut.argmax(decision_probs) review['evidence_decision'] = evidence_decision truth = infr.match_state_gt(edge) if review['evidence_decision'] != truth: infr.print( 'AUTOMATIC ERROR edge={}, truth={}, decision={}, probs={}'.format( edge, truth, review['evidence_decision'], decision_probs ), 2, color='red', ) auto_flag = True if auto_flag and infr.verbose > 1: infr.print('Automatic review success') if auto_flag: return review else: return None
[docs] def request_oracle_review(infr, edge, **kw): truth = infr.match_state_gt(edge) feedback = infr.oracle.review(edge, truth, infr, **kw) return feedback
def _make_review_tuple(infr, edge, priority=None): """Makes tuple to be sent back to the user""" edge_data = infr.get_nonvisual_edge_data(edge, on_missing='default') # Extra information edge_data['nid_edge'] = infr.pos_graph.node_labels(*edge) if infr.queue is None: edge_data['queue_len'] = 0 else: edge_data['queue_len'] = len(infr.queue) edge_data['n_ccs'] = ( len(infr.pos_graph.connected_to(edge[0])), len(infr.pos_graph.connected_to(edge[1])), ) return (edge, priority, edge_data)
[docs] def emit_manual_review(infr, edge, priority=None): """ Emits a signal containing edges that need review. The callback should present them to a user, get feedback, and then call on_accpet. """ infr.print('emit_manual_review', 100) # Emit a list of reviews that can be considered. # The first is the most important user_request = [] user_request += [infr._make_review_tuple(edge, priority)] for edge_, priority in infr.peek_many(infr.params['manual.n_peek']): if edge == edge_: continue user_request += [infr._make_review_tuple(edge_, priority)] # If registered, send the request via a callback. request_review = infr.callbacks.get('request_review', None) if request_review is not None: # Send these reviews to a user request_review(user_request) # Otherwise the current process must handle the request by return value return user_request
[docs] def skip(infr, edge): infr.print('skipping edge={}'.format(edge), 100) try: del infr.queue[edge] except Exception: pass
[docs] def accept(infr, feedback): """ Called when user has completed feedback from qt or web """ annot1_state = feedback.pop('annot1_state', None) annot2_state = feedback.pop('annot2_state', None) if annot1_state: infr.add_node_feedback(**annot1_state) if annot2_state: infr.add_node_feedback(**annot2_state) infr.add_feedback(**feedback) if infr.params['manual.autosave']: infr.write_wbia_staging_feedback()
[docs] def resume(infr): with infr._gen_lock: infr.print('continue_review', 10) if infr._gen is None: return 'finished:stopped' try: user_request = next(infr._gen) except StopIteration: review_finished = infr.callbacks.get('review_finished', None) if review_finished is not None: review_finished() infr._gen = None user_request = 'finished:stopiteration' return user_request
[docs] def qt_edge_reviewer(infr, edge=None): import wbia.guitool as gt gt.ensure_qapp() from wbia.viz import viz_graph2 infr.manual_wgt = viz_graph2.AnnotPairDialog( edge=edge, infr=infr, standalone=False, cfgdict=infr.verifier_params ) if edge is not None: # infr.emit_manual_review(edge, priority=None) infr.manual_wgt.seek(0) # infr.manual_wgt.show() return infr.manual_wgt
[docs] def qt_review_loop(infr): r""" TODO: The loop parts should be a non-mixin class Qt review loop entry point CommandLine: python -m wbia.algo.graph.mixin_loops qt_review_loop --show Example: >>> # SCRIPT >>> import utool as ut >>> import wbia >>> ibs = wbia.opendb('PZ_MTEST') >>> infr = wbia.AnnotInference(ibs, 'all', autoinit=True) >>> infr.ensure_mst() >>> # Add dummy priorities to each edge >>> infr.set_edge_attrs('prob_match', ut.dzip(infr.edges(), [1])) >>> infr.prioritize('prob_match', infr.edges(), reset=True) >>> infr.params['redun.enabled'] = False >>> win = infr.qt_review_loop() >>> import wbia.guitool as gt >>> gt.qtapp_loop(qwin=win, freq=10) """ infr.qt_edge_reviewer() # infr.resume() return infr.manual_wgt
if False: # Testing generating using threads from threading import Thread _sentinel = object() class _background_consumer(Thread): # NOQA """ Will fill the queue with content of the source in a separate thread. Ignore: >>> from wbia.algo.graph.mixin_loops import * >>> import wbia >>> infr = wbia.AnnotInference('PZ_MTEST', aids='all', >>> autoinit='staging', verbose=4) >>> infr.load_published() >>> gen = infr.find_pos_redun_candidate_edges() >>> parbuf = buffered_add_candidate_edges(infr, 3, gen) >>> next(parbuf) """ def __init__(self, infr, queue, source): Thread.__init__(self) self.infr = infr self._queue = queue self._source = source def run(self): # for edges in ub.chunks(self._source, 5): # logger.info('edges = {!r}'.format(edges)) # # logger.info('put item = {!r}'.format(item)) # # probably not thread safe # infr = self.infr # infr.add_candidate_edges(edges) # for item in edges: # self._queue.put(item) for _, item in enumerate(self._source): # import threading # import multiprocessing # logger.info('multiproc = ' + str(multiprocessing.current_process())) # logger.info('thread = ' + str(threading.current_thread())) # logger.info('_ = {!r}'.format(_)) # logger.info('item = {!r}'.format(item)) # logger.info('put item = {!r}'.format(item)) # probably not thread safe infr = self.infr infr.add_candidate_edges([item]) self._queue.put(item, block=True) # Signal the consumer we are done. self._queue.put(_sentinel) class buffered_add_candidate_edges(object): # NOQA """ Buffers content of an iterator polling the contents of the given iterator in a separate thread. When the consumer is faster than many producers, this kind of concurrency and buffering makes sense. The size parameter is the number of elements to buffer. The source must be threadsafe. """ def __init__(self, infr, size, source): from queue import Queue self._queue = Queue(size) self._poller = _background_consumer(infr, self._queue, source) self._poller.daemon = True self._poller.start() def __iter__(self): return self def __next__(self): item = self._queue.get(True) if item is _sentinel: return return item next = __next__