# -*- coding: utf-8 -*-
import logging
from wbia.algo.graph import demo
import utool as ut
from wbia.algo.graph.state import POSTV, NEGTV, INCMP, UNREV # NOQA
from wbia.algo.graph.state import SAME, DIFF, NULL # NOQA
print, rrr, profile = ut.inject2(__name__)
logger = logging.getLogger('wbia')
[docs]def do_infr_test(ccs, edges, new_edges):
"""
Creates a graph with `ccs` + `edges` and then adds `new_edges`
"""
# import networkx as nx
import wbia.plottool as pt
infr = demo.make_demo_infr(ccs, edges)
if ut.show_was_requested():
pt.qtensure()
# Preshow
fnum = 1
if ut.show_was_requested():
infr.set_node_attrs('shape', 'circle')
infr.show(
pnum=(2, 1, 1),
fnum=fnum,
show_unreviewed_edges=True,
show_reviewed_cuts=True,
splines='spline',
show_inferred_diff=True,
groupby='name_label',
show_labels=True,
pickable=True,
)
pt.set_title('pre-review')
pt.gca().set_aspect('equal')
infr.set_node_attrs('pin', 'true')
# fig1 = pt.gcf()
# fig1.canvas.mpl_connect('pick_event', ut.partial(on_pick, infr=infr))
infr1 = infr
infr2 = infr.copy()
for new_edge in new_edges:
aid1, aid2, data = new_edge
evidence_decision = data['evidence_decision']
infr2.add_feedback((aid1, aid2), evidence_decision)
infr2.relabel_using_reviews(rectify=False)
infr2.apply_nondynamic_update()
# Postshow
if ut.show_was_requested():
infr2.show(
pnum=(2, 1, 2),
fnum=fnum,
show_unreviewed_edges=True,
show_inferred_diff=True,
show_labels=True,
)
pt.gca().set_aspect('equal')
pt.set_title('post-review')
# fig2 = pt.gcf()
# if fig2 is not fig1:
# fig2.canvas.mpl_connect('pick_event', ut.partial(on_pick, infr=infr2))
class Checker(object):
"""
Asserts pre and post test properties of the graph
"""
def __init__(self, infr1, infr2):
self._errors = []
self.infr1 = infr1
self.infr2 = infr2
def __call__(self, infr, u, v, key, val, msg):
data = infr.get_nonvisual_edge_data((u, v))
if data is None:
assert infr.graph.has_edge(u, v), 'uv=%r, %r does not exist' % (u, v)
got = data.get(key)
if got != val:
msg1 = 'key=%s %r!=%r, ' % (key, got, val)
errmsg = ''.join(
[
msg1,
msg,
'\nedge=',
ut.repr2((u, v)),
'\n',
infr.repr_edge_data(data),
]
)
self._errors.append(errmsg)
def custom_precheck(self, func):
try:
func(self.infr1)
except AssertionError as ex:
self._errors.append(str(ex))
def after(self, errors=[]):
"""
Delays error reporting until after visualization
prints errors, then shows you the graph, then
finally if any errors were discovered they are raised
"""
errors = errors + self._errors
if errors:
ut.cprint('PRINTING %d FAILURE' % (len(errors)), 'red')
for msg in errors:
logger.info(msg)
ut.cprint('HAD %d FAILURE' % (len(errors)), 'red')
if ut.show_was_requested():
pt.all_figures_tile(percent_w=0.5)
ut.show_if_requested()
if errors:
raise AssertionError('There were errors')
check = Checker(infr1, infr2)
return infr1, infr2, check
[docs]def case_negative_infr():
"""
CommandLine:
python -m wbia.algo.graph.tests.dyn_cases case_negative_infr --show
Example:
>>> # ENABLE_DOCTEST
>>> from wbia.algo.graph.tests.dyn_cases import * # NOQA
>>> case_negative_infr()
"""
# Initial positive reviews
ccs = [[1, 2, 3], [9]]
# Add in initial reviews
edges = [
(9, 7, {'evidence_decision': NEGTV}),
(1, 7, {'inferred_state': None}),
(1, 9, {'inferred_state': None}),
]
# Add in scored, but unreviewed edges
new_edges = [(3, 9, {'evidence_decision': NEGTV})]
infr1, infr2, check = do_infr_test(ccs, edges, new_edges)
check(
infr2,
1,
7,
'inferred_state',
None,
'negative review of an edge should not jump more than one component',
)
check(
infr2,
1,
9,
'inferred_state',
'diff',
'negative review of an edge should cut within one jump',
)
check.after()
[docs]def case_match_infr():
"""
CommandLine:
python -m wbia.algo.graph.tests.dyn_cases case_match_infr --show
Example:
>>> # ENABLE_DOCTEST
>>> from wbia.algo.graph.tests.dyn_cases import * # NOQA
>>> case_match_infr()
"""
# Initial positive reviews
ccs = [[2], [7], [9, 10]]
# Add in initial reviews
edges = [
(9, 8, {'evidence_decision': NEGTV}),
(7, 2, {}),
]
# Add in scored, but unreviewed edges
edges += [
(2, 8, {'inferred_state': None}),
(2, 9, {'inferred_state': None}),
]
new_edges = [(2, 10, {'evidence_decision': POSTV})]
infr1, infr2, check = do_infr_test(ccs, edges, new_edges)
# Checks out of cc inferences
check(infr2, 2, 9, 'inferred_state', 'same', 'should infer a match')
check(infr2, 2, 8, 'inferred_state', 'diff', 'should infer a negative')
check(infr1, 2, 7, 'inferred_state', None, 'discon should have inference')
check(infr2, 2, 7, 'inferred_state', None, 'discon should have inference')
check.after()
[docs]def case_inconsistent():
"""
CommandLine:
python -m wbia.algo.graph.tests.dyn_cases case_inconsistent --show
Example:
>>> # ENABLE_DOCTEST
>>> from wbia.algo.graph.tests.dyn_cases import * # NOQA
>>> case_inconsistent()
"""
ccs = [[1, 2], [3, 4, 5]] # [6, 7]]
edges = [
(2, 3, {'evidence_decision': NEGTV}),
]
edges += [
(4, 1, {'inferred_state': None}),
# (2, 7, {'inferred_state': None}),
]
new_edges = [(1, 5, {'evidence_decision': POSTV})]
infr1, infr2, check = do_infr_test(ccs, edges, new_edges)
# Make sure the previously inferred edge is no longer inferred
check(infr1, 4, 1, 'inferred_state', 'diff', 'should initially be an inferred diff')
check(
infr2,
4,
1,
'inferred_state',
'inconsistent_internal',
'should not be inferred after incon',
)
check(infr2, 4, 3, 'maybe_error', True, 'need to have a maybe split')
check.after()
[docs]def case_redo_incon():
"""
CommandLine:
python -m wbia.algo.graph.tests.dyn_cases case_redo_incon --show
Example:
>>> # ENABLE_DOCTEST
>>> from wbia.algo.graph.tests.dyn_cases import * # NOQA
>>> case_redo_incon()
"""
ccs = [[1, 2], [3, 4]] # [6, 7]]
edges = [
(2, 3, {'evidence_decision': NEGTV}),
(1, 4, {'evidence_decision': NEGTV}),
]
edges += []
new_edges = [(2, 3, {'evidence_decision': POSTV})]
infr1, infr2, check = do_infr_test(ccs, edges, new_edges)
maybe_splits = infr2.get_edge_attrs('maybe_error', default=None)
logger.info('maybe_splits = %r' % (maybe_splits,))
if not any(maybe_splits.values()):
ut.cprint('FAILURE', 'red')
logger.info('At least one edge should be marked as a split')
check.after()
[docs]def case_override_inference():
"""
CommandLine:
python -m wbia.algo.graph.tests.dyn_cases case_override_inference --show
Example:
>>> # ENABLE_DOCTEST
>>> from wbia.algo.graph.tests.dyn_cases import * # NOQA
>>> case_override_inference()
"""
ccs = [[1, 2, 3, 4, 5]]
edges = [
(1, 3, {'inferred_state': 'same'}),
(1, 4, {'inferred_state': 'same'}),
# (1, 5, {'inferred_state': 'same'}),
(1, 2, {'inferred_state': 'same', 'num_reviews': 2}),
(2, 3, {'inferred_state': 'same', 'num_reviews': 2}),
(2, 4, {'inferred_state': 'same'}),
(2, 5, {'inferred_state': 'same'}),
(3, 4, {'inferred_state': 'same', 'num_reviews': 100}),
(4, 5, {'inferred_state': 'same', 'num_reviews': 0.01}),
]
edges += []
new_edges = [
(1, 5, {'evidence_decision': NEGTV}),
(5, 2, {'evidence_decision': NEGTV}),
]
infr1, infr2, check = do_infr_test(ccs, edges, new_edges)
# Make sure that the inferred edges are no longer inferred when an
# inconsistent case is introduced
check(infr2, 1, 4, 'maybe_error', None, 'should not split inferred edge')
check(infr2, 4, 5, 'maybe_error', True, 'split me')
check(
infr2,
5,
2,
'inferred_state',
'inconsistent_internal',
'inference should be overriden',
)
check.after()
[docs]def case_undo_match():
"""
CommandLine:
python -m wbia.algo.graph.tests.dyn_cases case_undo_match --show
Example:
>>> # ENABLE_DOCTEST
>>> from wbia.algo.graph.tests.dyn_cases import * # NOQA
>>> case_undo_match()
"""
ccs = [[1, 2]]
edges = []
new_edges = [(1, 2, {'evidence_decision': NEGTV})]
infr1, infr2, check = do_infr_test(ccs, edges, new_edges)
check(infr2, 1, 2, 'inferred_state', 'diff', 'should have cut edge')
check.after()
[docs]def case_undo_negative():
"""
CommandLine:
python -m wbia.algo.graph.tests.dyn_cases case_undo_negative --show
Example:
>>> # ENABLE_DOCTEST
>>> from wbia.algo.graph.tests.dyn_cases import * # NOQA
>>> case_undo_negative()
"""
ccs = [[1], [2]]
edges = [
(1, 2, {'evidence_decision': NEGTV}),
]
new_edges = [(1, 2, {'evidence_decision': POSTV})]
infr1, infr2, check = do_infr_test(ccs, edges, new_edges)
check(infr2, 1, 2, 'inferred_state', 'same', 'should have matched edge')
check.after()
[docs]def case_incon_removes_inference():
"""
CommandLine:
python -m wbia.algo.graph.tests.dyn_cases case_incon_removes_inference --show
Example:
>>> # ENABLE_DOCTEST
>>> from wbia.algo.graph.tests.dyn_cases import * # NOQA
>>> case_incon_removes_inference()
"""
ccs = [[1, 2, 3], [4, 5, 6]]
edges = [
(3, 4, {'evidence_decision': NEGTV}),
(1, 5, {'evidence_decision': NEGTV}),
(2, 5, {}),
(1, 6, {}),
]
new_edges = [(3, 4, {'evidence_decision': POSTV})]
infr1, infr2, check = do_infr_test(ccs, edges, new_edges)
check(infr1, 2, 5, 'inferred_state', 'diff', 'should be preinferred')
check(
infr2,
2,
5,
'inferred_state',
'inconsistent_internal',
'should be uninferred on incon',
)
check.after()
[docs]def case_inferable_notcomp1():
"""
make sure notcomparable edges can be inferred
CommandLine:
python -m wbia.algo.graph.tests.dyn_cases case_inferable_notcomp1 --show
Example:
>>> # ENABLE_DOCTEST
>>> from wbia.algo.graph.tests.dyn_cases import * # NOQA
>>> case_inferable_notcomp1()
"""
ccs = [[1, 2], [3, 4]]
edges = [
(2, 3, {'evidence_decision': NEGTV}),
]
new_edges = [(1, 4, {'evidence_decision': INCMP})]
infr1, infr2, check = do_infr_test(ccs, edges, new_edges)
check(infr2, 1, 4, 'inferred_state', 'diff', 'should be inferred')
check.after()
[docs]def case_inferable_update_notcomp():
"""
make sure inference updates for nocomparable edges
CommandLine:
python -m wbia.algo.graph.tests.dyn_cases case_inferable_update_notcomp --show
Example:
>>> # ENABLE_DOCTEST
>>> from wbia.algo.graph.tests.dyn_cases import * # NOQA
>>> case_inferable_update_notcomp()
"""
ccs = [[1, 2], [3, 4]]
edges = [
(2, 3, {'evidence_decision': NEGTV}),
(1, 4, {'evidence_decision': INCMP}),
]
new_edges = [(2, 3, {'evidence_decision': POSTV})]
infr1, infr2, check = do_infr_test(ccs, edges, new_edges)
check(infr1, 1, 4, 'inferred_state', 'diff', 'should be inferred diff')
check(infr2, 1, 4, 'inferred_state', 'same', 'should be inferred same')
check.after()
[docs]def case_notcomp_remove_infr():
"""
CommandLine:
python -m wbia.algo.graph.tests.dyn_cases case_notcomp_remove_infr --show
Example:
>>> # ENABLE_DOCTEST
>>> from wbia.algo.graph.tests.dyn_cases import * # NOQA
>>> case_notcomp_remove_infr()
"""
ccs = [[1, 2, 3], [4, 5, 6]]
edges = [
(1, 4, {'evidence_decision': POSTV}),
# (1, 4, {'evidence_decision': INCMP}),
(2, 5, {'evidence_decision': INCMP}),
(3, 6, {'evidence_decision': INCMP}),
]
new_edges = [(1, 4, {'evidence_decision': INCMP})]
infr1, infr2, check = do_infr_test(ccs, edges, new_edges)
check(infr2, 1, 4, 'inferred_state', INCMP, 'can not infer match here!')
check(infr2, 2, 5, 'inferred_state', INCMP, 'can not infer match here!')
check(infr2, 3, 6, 'inferred_state', INCMP, 'can not infer match here!')
check.after()
[docs]def case_notcomp_remove_cuts():
"""
CommandLine:
python -m wbia.algo.graph.tests.dyn_cases case_notcomp_remove_cuts --show
Example:
>>> # ENABLE_DOCTEST
>>> from wbia.algo.graph.tests.dyn_cases import * # NOQA
>>> case_notcomp_remove_cuts()
"""
ccs = [[1, 2, 3], [4, 5, 6]]
edges = [
(1, 4, {'evidence_decision': NEGTV}),
# (1, 4, {'evidence_decision': INCMP}),
(2, 5, {'evidence_decision': INCMP}),
(3, 6, {'evidence_decision': INCMP}),
]
new_edges = [(1, 4, {'evidence_decision': INCMP})]
infr1, infr2, check = do_infr_test(ccs, edges, new_edges)
check(infr1, 1, 4, 'inferred_state', 'diff', 'should infer diff!')
check(infr1, 2, 5, 'inferred_state', 'diff', 'should infer diff!')
check(infr1, 3, 6, 'inferred_state', 'diff', 'should infer diff!')
check(infr2, 1, 4, 'evidence_decision', INCMP, 'can not infer cut here!')
check(infr2, 2, 5, 'inferred_state', INCMP, 'can not infer cut here!')
check(infr2, 3, 6, 'inferred_state', INCMP, 'can not infer cut here!')
check.after()
[docs]def case_keep_in_cc_infr_post_negative():
"""
CommandLine:
python -m wbia.algo.graph.tests.dyn_cases case_keep_in_cc_infr_post_negative --show
Example:
>>> # ENABLE_DOCTEST
>>> from wbia.algo.graph.tests.dyn_cases import * # NOQA
>>> case_keep_in_cc_infr_post_negative()
"""
ccs = [[1, 2, 3], [4]]
edges = [(1, 3), (1, 4), (2, 4), (3, 4)]
new_edges = [(4, 2, {'evidence_decision': NEGTV})]
infr1, infr2, check = do_infr_test(ccs, edges, new_edges)
check(infr1, 3, 4, 'inferred_state', None, 'should be no inference')
check(infr1, 1, 3, 'inferred_state', 'same', 'should be inferred')
check(infr2, 1, 3, 'inferred_state', 'same', 'should remain inferred')
check(infr2, 3, 4, 'inferred_state', 'diff', 'should become inferred')
check.after()
[docs]def case_keep_in_cc_infr_post_notcomp():
"""
CommandLine:
python -m wbia.algo.graph.tests.dyn_cases case_keep_in_cc_infr_post_notcomp --show
Example:
>>> # ENABLE_DOCTEST
>>> from wbia.algo.graph.tests.dyn_cases import * # NOQA
>>> case_keep_in_cc_infr_post_notcomp()
"""
ccs = [[1, 2, 3], [4]]
edges = [(1, 3), (1, 4), (2, 4), (3, 4)]
new_edges = [(4, 2, {'evidence_decision': INCMP})]
infr1, infr2, check = do_infr_test(ccs, edges, new_edges)
check(infr1, 3, 4, 'inferred_state', None, 'should not be inferred')
check(infr1, 1, 3, 'inferred_state', 'same', 'should be inferred')
check(infr2, 1, 3, 'inferred_state', 'same', 'should remain inferred')
check(infr2, 3, 4, 'inferred_state', None, 'should not become inferred')
check.after()
[docs]def case_out_of_subgraph_modification():
"""
CommandLine:
python -m wbia.algo.graph.tests.dyn_cases case_out_of_subgraph_modification --show
Example:
>>> # ENABLE_DOCTEST
>>> from wbia.algo.graph.tests.dyn_cases import * # NOQA
>>> case_out_of_subgraph_modification()
"""
# A case where a review between two ccs modifies state outside of
# the subgraph of ccs
ccs = [[1, 2], [3, 4], [5, 6]]
edges = [(2, 6), (4, 5, {'evidence_decision': NEGTV})]
new_edges = [(2, 3, {'evidence_decision': POSTV})]
infr1, infr2, check = do_infr_test(ccs, edges, new_edges)
check(infr1, 2, 6, 'inferred_state', None, 'should not be inferred')
check(infr2, 2, 6, 'inferred_state', 'diff', 'should be inferred')
check.after()
[docs]def case_flag_merge():
"""
CommandLine:
python -m wbia.algo.graph.tests.dyn_cases case_flag_merge --show
Example:
>>> # ENABLE_DOCTEST
>>> from wbia.algo.graph.tests.dyn_cases import * # NOQA
>>> case_flag_merge()
"""
# A case where a review between two ccs modifies state outside of
# the subgraph of ccs
ccs = []
edges = [
(1, 2, {'evidence_decision': POSTV, 'num_reviews': 2}),
(4, 1, {'evidence_decision': POSTV, 'num_reviews': 1}),
(2, 4, {'evidence_decision': NEGTV, 'num_reviews': 1}),
]
# Ensure that the negative edge comes back as potentially in error
new_edges = [(1, 4, {'evidence_decision': POSTV})]
infr1, infr2, check = do_infr_test(ccs, edges, new_edges)
@check.custom_precheck
def check_pre_state(infr):
assert infr.nid_to_errors[1] == {(1, 4)}
check(infr1, 2, 4, 'maybe_error', None, 'match edge should flag first None')
check(infr1, 1, 4, 'maybe_error', True, 'match edge should flag first True')
check(infr2, 2, 4, 'maybe_error', True, 'negative edge should flag second True')
check(infr2, 1, 4, 'maybe_error', None, 'negative edge should flag second None')
check.after()
[docs]def case_all_types():
"""
CommandLine:
python -m wbia.algo.graph.tests.dyn_cases case_all_types --show
Example:
>>> # ENABLE_DOCTEST
>>> from wbia.algo.graph.tests.dyn_cases import * # NOQA
>>> case_all_types()
"""
# A case where a review between two ccs modifies state outside of
# the subgraph of ccs
ccs = []
# Define edges within components
edges = [
# Inconsistent component
(11, 12, {'evidence_decision': POSTV}),
(12, 13, {'evidence_decision': POSTV}),
(11, 13, {'evidence_decision': NEGTV}),
(11, 14, {'evidence_decision': POSTV}),
(12, 14, {'evidence_decision': POSTV}),
(13, 14, {}),
(11, 15, {'evidence_decision': POSTV}),
(12, 15, {'evidence_decision': INCMP}),
# Positive component (with notcomp)
(21, 22, {'evidence_decision': POSTV}),
(22, 23, {'evidence_decision': POSTV}),
(21, 23, {'evidence_decision': INCMP}),
(21, 24, {'evidence_decision': POSTV}),
(22, 24, {'evidence_decision': POSTV}),
(23, 24, {}),
# Positive component (with unreview)
(31, 32, {'evidence_decision': POSTV}),
(32, 33, {'evidence_decision': POSTV}),
(31, 33, {'evidence_decision': POSTV}),
(31, 34, {'evidence_decision': POSTV}),
(32, 34, {'evidence_decision': POSTV}),
(33, 34, {}),
# Positive component
(41, 42, {'evidence_decision': POSTV}),
(42, 43, {'evidence_decision': POSTV}),
(41, 43, {'evidence_decision': POSTV}),
# Positive component (extra)
(51, 52, {'evidence_decision': POSTV}),
(52, 53, {'evidence_decision': POSTV}),
(51, 53, {'evidence_decision': POSTV}),
# Positive component (isolated)
(61, 62, {'evidence_decision': POSTV}),
]
# Define edges between components
edges += [
# 1 - 2
(11, 21, {}),
(12, 22, {}),
# 1 - 3
(11, 31, {}),
(12, 32, {'evidence_decision': NEGTV}),
(13, 33, {}),
# 1 - 4
(11, 41, {}),
(12, 42, {'evidence_decision': INCMP}),
(13, 43, {}),
# 1 - 5
(11, 51, {'evidence_decision': INCMP}),
(12, 52, {'evidence_decision': NEGTV}),
(13, 53, {}),
# 2 - 3
(21, 31, {'evidence_decision': INCMP}),
(22, 32, {}),
# 2 - 4
(21, 41, {}),
(22, 42, {}),
# 2 - 5
(21, 51, {'evidence_decision': INCMP}),
(22, 52, {'evidence_decision': NEGTV}),
# 3 - 4
(31, 41, {'evidence_decision': NEGTV}),
(32, 42, {}),
]
# Ensure that the negative edge comes back as potentially in error
# new_edges = [(2, 5, {'evidence_decision': POSTV})]
new_edges = []
infr1, infr2, check = do_infr_test(ccs, edges, new_edges)
errors = []
for u, v, d in infr2.graph.edges(data=True):
state = d.get('inferred_state', '')
if u < 20 or v < 20:
if state is not None and 'inconsistent' not in state:
logger.info('u, v, state = %r, %r, %r' % (u, v, state))
err = AssertionError('all of cc0 should be incon')
logger.info(err)
errors.append(err)
else:
if state is not None and 'inconsistent' in state:
logger.info('u, v, state = %r, %r, %r' % (u, v, state))
err = AssertionError('outside of cc0 should not be incon')
logger.info(err)
errors.append(err)
check(
infr1,
13,
14,
'inferred_state',
'inconsistent_internal',
'notcomp edge should be incon',
)
check(infr1, 21, 31, 'inferred_state', INCMP, 'notcomp edge should remain notcomp')
check(infr1, 22, 32, 'inferred_state', None, 'notcomp edge should transfer knowledge')
check(
infr1,
12,
42,
'inferred_state',
'inconsistent_external',
'inconsistency should override notcomp',
)
# check(infr1, 1, 4, 'maybe_error', True, 'match edge should flag first')
# check(infr2, 2, 4, 'maybe_error', True, 'negative edge should flag second')
# check(infr2, 1, 4, 'maybe_error', False, 'negative edge should flag second')
check.after(errors)