#!/usr/bin/env python2.7 # encoding: utf-8 from __future__ import absolute_import from __future__ import division from __future__ import print_function import sys if __name__ == '__main__': print( "You probably wanted to execute run.py instead of this.", file=sys.stderr ) sys.exit(1) from collections import Counter from collections import defaultdict from collections import OrderedDict from functools import partial from operator import attrgetter import random import signal from time import sleep from datetime import datetime from cachetools.func import lru_cache import deap import deap.base import deap.tools import numpy as np from rdflib import BNode from rdflib import Literal from rdflib import URIRef from rdflib import Variable from rdflib import XSD import SPARQLWrapper from scoop.futures import map as parallel_map import six import logging logger = logging.getLogger(__name__) import logging_config from cluster import expected_precision_loss_by_query_reduction from cluster import cluster_gps_to_reduce_queries import config from exception import GPLearnerAbortException from fusion import fuse_prediction_results from fusion import train_fusion_models from gp_query import ask_multi_query from gp_query import calibrate_query_timeout from gp_query import combined_ask_count_multi_query from gp_query import predict_multi_query from gp_query import predict_query from gp_query import query_stats from gp_query import query_time_hard_exceeded from gp_query import query_time_soft_exceeded from gp_query import variable_substitution_query from graph_pattern import canonicalize from graph_pattern import gen_random_var from graph_pattern import GPFitness from graph_pattern import GPFitnessTuple from graph_pattern import GraphPattern from graph_pattern import GraphPatternStats from graph_pattern import replace_vars_with_random_vars from graph_pattern import SOURCE_VAR from graph_pattern import TARGET_VAR from ground_truth_tools import get_semantic_associations from ground_truth_tools import k_fold_cross_validation from ground_truth_tools import split_training_test_set from gtp_scores import GTPScores from memory_usage import log_mem_usage from serialization import find_last_result from serialization import load_predicted_target_candidates from serialization import save_predicted_target_candidates from serialization import find_run_result from serialization import format_graph_pattern from serialization import load_init_patterns from serialization import load_results from serialization import pause_if_signaled_by_file from serialization import print_graph_pattern from serialization import print_population from serialization import print_results from serialization import remove_old_result_files from serialization import save_generation from serialization import save_results from serialization import save_run from serialization import set_symlink from utils import exception_stack_catcher from utils import kv_str from utils import log_all_exceptions from utils import log_wrapped_exception from utils import sample_from_list logger.info('init gp_learner') signal.signal(signal.SIGUSR1, log_mem_usage) def init_workers(): parallel_map(_init_workers, range(1000)) def _init_workers(_): # dummy method that makes workers load all import and config pass def f_measure(precision, recall, beta=config.F_MEASURE_BETA): """Calculates the f1-measure from precision and recall.""" if precision + recall <= 0: return 0. beta_sq = beta ** 2 return (1 + beta_sq) * precision * recall / (beta_sq * precision + recall) @exception_stack_catcher def evaluate(sparql, timeout, gtp_scores, graph_pattern, run=0, gen=0): assert isinstance(graph_pattern, GraphPattern) assert isinstance(gtp_scores, GTPScores) ground_truth_pairs = gtp_scores.ground_truth_pairs remaining_gain = gtp_scores.remaining_gain gtp_max_precisions = gtp_scores.gtp_max_precisions complete_pattern = 1 if graph_pattern.complete() else 0 pattern_length = len(graph_pattern) vars_in_graph_pattern = graph_pattern.vars_in_graph pattern_vars = len(vars_in_graph_pattern) logger.debug('evaluating %s', graph_pattern) # check how many gt_matches (recall) and targets (precision) can be reached # there are several cases here: # - ?source & ?target in pattern: # we can use a combined_ask_count_multi_query to only use one query and # get gt_matches and res_lengths at once # - only ?source in pattern # we can check how many gt_matches (recall) there are with ask_multi_query # but as we don't have a ?target variable we can't ever reach a target # with this pattern, meaning we have res_lengths = [] --> precision = 0 # - only ?target in pattern # we can check how many gt_matches (recall) there are with ask_multi_query # we could still get res_lengths with a count_query, which might be useful # for universal targets? 'select count(?target) as ?count' but it turns # out this is bad idea: # - it gives incomplete ?target var patterns an advantage against # ?source only patterns (which will always have a precision of 0) # - the precision of a ?target var only pattern is preferring very narrow # patterns (one could say over-fitting) which are only tailored towards # singling out one specific ?target variable as they can't rely on a # ?source to actually help with the filtering. Example: # (dbpedia-de:Pferd owl:sameAs ?target) (only returns dbpedia:Horse) # - none of the above in pattern: should never happen if complete_pattern: query_time, (gt_matches, res_lengths) = combined_ask_count_multi_query( sparql, timeout, graph_pattern, ground_truth_pairs) else: # run an ask_multi_query to see how many gt_matches (recall) we have query_time, gt_matches = ask_multi_query( sparql, timeout, graph_pattern, ground_truth_pairs) res_lengths = {gtp: 0 for gtp in ground_truth_pairs} matching_node_pairs = [ gtp for gtp in ground_truth_pairs if gt_matches[gtp] ] # TODO: maybe punish patterns which produce results but don't match? # judging query times: # We can't trust counts returned by timed out queries. Nevertheless, we want # to give soft-timeout queries a slight advantage over complete failures. # For this we use the f_measure component which still gets a severe # punishment below, while the individual counts and via that gain and score # are ignored in case of a timeout. qtime_exceeded = 0 if query_time_hard_exceeded(query_time, timeout): qtime_exceeded = 1 elif query_time_soft_exceeded(query_time, timeout): qtime_exceeded = .5 trust = (1 - qtime_exceeded) sum_gt_matches = sum(gt_matches.values()) recall = sum_gt_matches / len(ground_truth_pairs) non_zero_res_lens = [l for l in res_lengths.values() if l > 0] avg_res_len = sum(non_zero_res_lens) / max(len(non_zero_res_lens), 1) precision = 0 if avg_res_len > 0: precision = 1 / avg_res_len fm = f_measure(precision, recall) * trust gtp_precisions = OrderedDict() gain = 0 if qtime_exceeded == 0: for gtp in matching_node_pairs: gtp_res_len = res_lengths[gtp] if gtp_res_len > 0: gtp_precision = 1 / gtp_res_len gtp_precisions[gtp] = gtp_precision score_diff = gtp_precision - gtp_max_precisions[gtp] if score_diff > 0: gain += score_diff # counter overfitting overfitting = 1 if matching_node_pairs: m_sources, m_targets = zip(*matching_node_pairs) if len(set(m_sources)) < 2: overfitting *= config.OVERFITTING_PUNISHMENT if len(set(m_targets)) < 2: overfitting *= config.OVERFITTING_PUNISHMENT score = trust * overfitting * gain # order of res needs to fit to graph_pattern.GPFitness res = ( remaining_gain, score, gain, fm, avg_res_len, sum_gt_matches, pattern_length, pattern_vars, qtime_exceeded, query_time, ) logger.log( config.LOGLVL_EVAL, 'Run %d, Generation %d: evaluated fitness for %s%s\n%s', run, gen, graph_pattern, GPFitness(res).format_fitness(), graph_pattern.fitness.description ) return res, matching_node_pairs, gtp_precisions def update_individuals(individuals, eval_results): """Updates the given individuals with the eval_results in-place. :param individuals: A list of individuals (GraphPatterns). :param eval_results: a list of results calculated by evaluate(), in the order of individuals. :return: None """ for ind, res in zip(individuals, eval_results): ind.fitness.values = res[0] ind.matching_node_pairs = res[1] ind.gtp_precisions = res[2] @lru_cache(maxsize=config.CACHE_SIZE_FIT_TO_LIVE) def fit_to_live(child): if 1 > len(child) > config.MAX_PATTERN_LENGTH: return False if not child.vars_in_graph & {SOURCE_VAR, TARGET_VAR}: return False if len(child.vars_in_graph) > config.MAX_PATTERN_VARS: return False if len(child.to_sparql_select_query()) > config.MAX_PATTERN_QUERY_SIZE: return False if any([ len(o.n3()) > config.MAX_LITERAL_SIZE for _, _, o in child if isinstance(o, Literal) ]): return False if any([ isinstance(s, Literal) or isinstance(p, (BNode, Literal)) for s, p, _ in child ]): return False if not child.is_connected(via_edges=config.PATTERN_P_CONNECTED): return False return True def mate_helper( overlap, delta_dom, delta_other, pb_overlap, pb_dom, pb_other, pb_rename_delta_vars, retries, ): assert isinstance(overlap, set) assert isinstance(delta_dom, set) assert isinstance(delta_other, set) for _ in range(retries): overlap_part = [t for t in overlap if random.random() < pb_overlap] dom_part = [t for t in delta_dom if random.random() < pb_dom] other_part = [t for t in delta_other if random.random() < pb_other] if random.random() < pb_rename_delta_vars: other_part = replace_vars_with_random_vars(other_part) child = canonicalize(GraphPattern(overlap_part + dom_part + other_part)) if fit_to_live(child): return child else: # most likely not connected, try connecting by merging vars nodes child = canonicalize(mutate_merge_var(child)) if fit_to_live(child): return child return None def mate( individual1, individual2, pb_overlap=config.CXPB_BP, pb_dominant_parent=config.CXPB_DP, pb_other_parent=config.CXPB_OP, pb_rename_delta_vars=config.CXPB_RV, retries=config.CX_RETRY, ): # mate patterns: # we return 2 children: # child1 who's dominant parent is individual1 # child2 who's dominant parent is individual2 # both children draw triples from the overlap with pb_overlap. # child1 draws each triple of individual1 with prob pb_dominant_parent and # each triple of individual2 with prob pb_other_parent. # child2 swaps the probabilities accordingly. # the drawings are repeated retries times. If no fit_to_live child is found # the dominant parent is returned assert fit_to_live(individual1), 'unfit indiv in mating %r' % (individual1,) assert fit_to_live(individual2), 'unfit indiv in mating %r' % (individual2,) overlap = set(individual1) & set(individual2) delta1 = set(individual1) - overlap delta2 = set(individual2) - overlap child1 = mate_helper( overlap, delta1, delta2, pb_overlap, pb_dominant_parent, pb_other_parent, pb_rename_delta_vars, retries, ) or individual1 child2 = mate_helper( overlap, delta2, delta1, pb_overlap, pb_dominant_parent, pb_other_parent, pb_rename_delta_vars, retries ) or individual2 assert fit_to_live(child1), 'mating %r and %r produced unfit child %r' % ( individual1, individual2, child1 ) assert fit_to_live(child2), 'mating %r and %r produced unfit child %r' % ( individual1, individual2, child2 ) return child1, child2 def mutate_introduce_var(child): identifiers = tuple(child.identifier_counts(exclude_vars=True)) if not identifiers: return child identifier = random.choice(identifiers) rand_var = gen_random_var() return GraphPattern(child, mapping={identifier: rand_var}) def mutate_split_var(child): # count triples that each var occurs in (not occurrences: (?s ?p ?s)) var_trip_count = Counter([ ti for t in child for ti in set(t) if isinstance(ti, Variable) # if ti not in (SOURCE_VAR, TARGET_VAR) # why not allow them to split? ]) # select vars that occur multiple times var_trip_count = Counter({i: c for i, c in var_trip_count.items() if c > 1}) if not var_trip_count: return child var_to_split = random.choice(list(var_trip_count.elements())) triples_with_var = [t for t in child if var_to_split in t] triples = [t for t in child if var_to_split not in t] # randomly split triples_with_var into 2 non-zero length parts: # the first part where var_to_split is substituted and the 2nd where not random.shuffle(triples_with_var) split_idx = random.randrange(1, len(triples_with_var)) triples += triples_with_var[split_idx:] rand_var = gen_random_var() triples += [ tuple([rand_var if ti == var_to_split else ti for ti in t]) for t in triples_with_var[:split_idx] ] gp = GraphPattern(triples) if not fit_to_live(gp): # can happen that we created a disconnected pattern: # orig: # ?s ?p ?X, ?X ?q ?Y, ?Y ?r ?t # splitvar X: # ?s ?p ?Z, ?X ?q ?Y, ?Y ?r ?t # try merging once, might lead to this: # ?s ?p ?Y, ?X ?q ?Y, ?Y ?r ?t gp = mutate_merge_var(gp) if not fit_to_live(gp): return child return gp def mutate_merge_var(child, pb_mv_mix=config.MUTPB_MV_MIX): if random.random() < pb_mv_mix: return mutate_merge_var_mix(child) else: return mutate_merge_var_sep(child) def _mutate_merge_var_helper(vars_): rand_vars = vars_ - {SOURCE_VAR, TARGET_VAR} merge_able_vars = len(rand_vars) - 1 if len(vars_) > len(rand_vars): # either SOURCE_VAR or TARGET_VAR is also available as merge target merge_able_vars += 1 merge_able_vars = max(0, merge_able_vars) return rand_vars, merge_able_vars def mutate_merge_var_mix(child): """Merges two variables into one, potentially merging node and edge vars.""" vars_ = child.vars_in_graph rand_vars, merge_able_vars = _mutate_merge_var_helper(vars_) if merge_able_vars < 1: return child # merge vars, even mixing nodes and edges var_to_replace = random.choice(list(rand_vars)) var_to_merge_into = random.choice(list(vars_ - {var_to_replace})) return GraphPattern(child, mapping={var_to_replace: var_to_merge_into}) def mutate_merge_var_sep(child): """Merges two variables into one, won't merge node and edge vars. Considers the node variables and edge variables separately. Depending on availability either merges 2 node variables or 2 edge variable. """ node_vars = {n for n in child.nodes if isinstance(n, Variable)} rand_node_vars, merge_able_node_vars = _mutate_merge_var_helper(node_vars) edge_vars = {e for e in child.edges if isinstance(e, Variable)} rand_edge_vars, merge_able_edge_vars = _mutate_merge_var_helper(edge_vars) if merge_able_node_vars < 1 and merge_able_edge_vars < 1: return child # randomly merge node or predicate vars proportional to their occurrences r = random.randrange(0, merge_able_node_vars + merge_able_edge_vars) if r < merge_able_node_vars: # we're merging node vars var_to_replace = random.choice(list(rand_node_vars)) var_to_merge_into = random.choice(list( node_vars - {var_to_replace})) else: # we're merging predicate vars var_to_replace = random.choice(list(rand_edge_vars)) var_to_merge_into = random.choice(list( edge_vars - {var_to_replace})) return GraphPattern(child, mapping={var_to_replace: var_to_merge_into}) def mutate_del_triple(child): l = len(child) if l < 2: return child new_child = GraphPattern(random.sample(child, l - 1)) if not fit_to_live(new_child): return child else: return new_child def _mutate_expand_node_helper(node, pb_en_out_link=config.MUTPB_EN_OUT_LINK): """Adds a new var-only triple to node. :param pb_en_out_link: Probability to create an outgoing triple. :return: The new triple, node and var """ var_edge = gen_random_var() var_node = gen_random_var() if random.random() < pb_en_out_link: new_triple = (node, var_edge, var_node) else: new_triple = (var_node, var_edge, node) return new_triple, var_node, var_edge def mutate_expand_node( child, node=None, pb_en_out_link=config.MUTPB_EN_OUT_LINK): """Expands a random node by adding a new var-only triple to it. Randomly selects a node. Then adds an outgoing or incoming triple with two new vars to it. :param child: The GraphPattern to expand a node in. :param node: If given the node to expand, otherwise :param pb_en_out_link: Probability to create an outgoing triple. :return: A child with the added outgoing/incoming triple. """ # TODO: can maybe be improved by sparqling if not node: nodes = list(child.nodes) node = random.choice(nodes) new_triple, _, _ = _mutate_expand_node_helper(node, pb_en_out_link) return child + (new_triple,) def mutate_add_edge(child): """Adds an edge between 2 randomly selected nodes. Randomly selects two nodes, then adds a new triple (n1, e, n2), where e is a new variable. :return: A child with the added edge. """ # TODO: can maybe be improved by sparqling nodes = list(child.nodes) if len(nodes) < 2: return child node1, node2 = random.sample(nodes, 2) var_edge = gen_random_var() new_triple = (node1, var_edge, node2) return child + (new_triple,) def mutate_increase_dist(child): """Increases the distance between ?source and ?target by one hop. Randomly adds a var only triple to the ?source or ?target var. Then swaps the new node with ?source/?target to increase the distance by one hop. :return: A child with increased distance between ?source and ?target. """ if not child.complete(): return child var_node = gen_random_var() var_edge = gen_random_var() old_st = random.choice([SOURCE_VAR, TARGET_VAR]) new_triple = random.choice([ (old_st, var_edge, var_node), # outgoing new triple (var_node, var_edge, old_st), # incoming new triple ]) new_child = child + (new_triple,) # replace the old source/target node with the new node and vice-versa to # move the old node one hop further away from everything else new_child = new_child.replace({old_st: var_node, var_node: old_st}) return new_child def mutate_fix_var_filter(item_counts): """Filters results of fix var mutation in-place. Excludes: - too long literals - URIs with encoding errors (real world!) - BNode results (they will not be fixed but stay SPARQL vars) - NaN or INF literals (Virtuoso bug https://github.com/openlink/virtuoso-opensource/issues/649 ) """ assert isinstance(item_counts, Counter) for i in list(item_counts.keys()): if isinstance(i, Literal): i_n3 = i.n3() if len(i_n3) > config.MAX_LITERAL_SIZE: logger.debug( 'excluding very long literal %d > %d from mutate_fix_var:\n' '%s...', len(i_n3), config.MAX_LITERAL_SIZE, i_n3[:128] ) del item_counts[i] elif i.datatype in (XSD['float'], XSD['double']) \ and six.text_type(i).lower() in ('nan', 'inf'): logger.debug('excluding %s due to Virtuoso Bug', i_n3) del item_counts[i] elif isinstance(i, URIRef): # noinspection PyBroadException try: i.n3() except Exception: # sadly RDFLib doesn't raise a more specific one # it seems some SPARQL endpoints (Virtuoso) are quite liberal # during their import process, so it can happen that we're # served broken URIs, which break when re-inserted into SPARQL # later by calling URIRef.n3() logger.warning( 'removed invalid URI from mutate_fix_var:\n%r', i ) del item_counts[i] elif isinstance(i, BNode): # make sure that BNodes stay variables logger.info('removed BNode from mutate_fix_var') del item_counts[i] else: logger.warning( 'exlcuding unknown result type from mutate_fix_var:\n%r', i ) del item_counts[i] @exception_stack_catcher def mutate_fix_var( sparql, timeout, gtp_scores, child, gtp_sample_max_n=config.MUTPB_FV_RGTP_SAMPLE_N, rand_var=None, sample_max_n=config.MUTPB_FV_SAMPLE_MAXN, limit=config.MUTPB_FV_QUERY_LIMIT, ): """Finds possible fixations for a randomly selected variable of the pattern. This is the a very important mutation of the gp learner, as it is the main source of actually gaining information from the SPARQL endpoint. The outline of the mutation is as follows: - If not passed in, randomly selects a variable (rand_var) of the pattern (node or edge var, excluding ?source and ?target). - Randomly selects a subset of up to gtp_sample_max_n GTPs with probabilities according to their remaining gains. The number of GTPs picked is randomized (see below). - Issues SPARQL queries to find possible fixations for the selected variable under the previously selected GTPs subset. Counts the fixation's occurrences wrt. the GTPs and sorts the result descending by these counts. - Limits the result rows to deal with potential long-tails. - Filters the resulting rows with mutate_fix_var_filter. - From the limited, filtered result rows randomly selects up to sample_max_n candidate fixations with probabilities according to their counts. - For each candidate fixation returns a child in which rand_var is replaced with the candidate fixation. The reasons for fixing rand_var based on a randomly sized subset of GTPs are efficiency and shadowing problems with common long-tails. Due to the later imposed limit (which is vital in real world use-cases), a few remaining GTPs that share more than `limit` potential fixations (so have a common long-tail) could otherwise hide solutions for other remaining GTPs. This can be the case if these common fixations have low fitness. By randomizing the subset size, we will eventually (and more likely) select other combinations of remaining GTPs. :param sparql: SPARQLWrapper endpoint. :param timeout: Timeout in seconds for each individual query (gp). :param gtp_scores: Current GTPScores object for sampling. :param child: a graph pattern to mutate. :param gtp_sample_max_n: Maximum GTPs subset size to base fixations on. :param rand_var: If given uses this variable instead of a random one. :param sample_max_n: Maximum number of children. :param limit: SPARQL limit for the top-k result rows. :return: A list of children in which the selected variable is substituted with fixation candidates wrt. GTPs. """ assert isinstance(child, GraphPattern) assert isinstance(gtp_scores, GTPScores) # The further we get, the less gtps are remaining. Sampling too many (all) # of them might hurt as common substitutions (> limit ones) which are dead # ends could cover less common ones that could actually help gtp_sample_max_n = min(gtp_sample_max_n, int(gtp_scores.remaining_gain)) gtp_sample_max_n = random.randint(1, gtp_sample_max_n) ground_truth_pairs = gtp_scores.remaining_gain_sample_gtps( max_n=gtp_sample_max_n) rand_vars = child.vars_in_graph - {SOURCE_VAR, TARGET_VAR} if len(rand_vars) < 1: return [child] if rand_var is None: rand_var = random.choice(list(rand_vars)) t, substitution_counts = variable_substitution_query( sparql, timeout, child, rand_var, ground_truth_pairs, limit) if not substitution_counts: # the current pattern is unfit, as we can't find anything fulfilling it logger.debug("tried to fix a var %s without result:\n%s" "seems as if the pattern can't be fulfilled!", rand_var, child) return [child] mutate_fix_var_filter(substitution_counts) if not substitution_counts: # could have happened that we removed the only possible substitution return [child] # randomly pick n of the substitutions with a prob ~ to their counts items, counts = zip(*substitution_counts.most_common()) substs = sample_from_list(items, counts, sample_max_n) logger.log( config.LOGLVL_MUTFV, 'fixed variable %s in %sto:\n %s\n<%d out of:\n%s\n', rand_var.n3(), child, '\n '.join([subst.n3() for subst in substs]), sample_max_n, '\n'.join([' %d: %s' % (c, v.n3()) for v, c in substitution_counts.most_common()]), ) res = [ GraphPattern(child, mapping={rand_var: subst}) for subst in substs ] return res def mutate_simplify_pattern(gp): if len(gp) < 2: return gp orig_gp = gp logger.debug('simplifying pattern\n%s', gp) # remove parallel variable edges (single variables only) # e.g., [ :x ?v1 ?y . :x ?v2 ?y. ] should remove :x ?v2 ?y. identifier_counts = gp.identifier_counts() edge_vars = [edge for edge in gp.edges if isinstance(edge, Variable)] # note that we also count occurrences in non-edge positions just to be safe! edge_var_counts = Counter({v: identifier_counts[v] for v in edge_vars}) edge_vars_once = [var for var, c in edge_var_counts.items() if c == 1] for var in sorted(edge_vars_once, reverse=True): var_triple = [(s, p, o) for s, p, o in gp if p == var][0] # only one s, _, o = var_triple parallel_triples = [ t for t in gp if (t[0], t[2]) == (s, o) and t[1] != var ] if parallel_triples: # remove alpha-num largest var triple gp -= [var_triple] # remove edges between fixed nodes (fixed and single var edges from above) fixed_node_triples = [ (s, p, o) for s, p, o in gp if not isinstance(s, Variable) and not isinstance(o, Variable) ] gp -= [ (s, p, o) for s, p, o in fixed_node_triples if not isinstance(p, Variable) or p in edge_vars_once ] # remove unrestricting leaf edges (single occurring vars only) and leaves # behind fixed nodes # more explicit: such edges are # - single occurrence edge vars with a single occ gen var node, # so (x, ?vp, ?vn) or (?vn, ?vp, x) # - or single occ gen var node behind fixed nodes old_gp = None while old_gp != gp: old_gp = gp gen_var_counts = gp.var_counts() for i in (SOURCE_VAR, TARGET_VAR): # remove all non generated vars from gen_var_counts del gen_var_counts[i] for t in gp: counts = map(lambda x: gen_var_counts[x], t) s, p, o = t # get counts for s, p and p, o. if [1, 1] remove triple if (( # single occ edge var with single node var counts[0:2] == [1, 1] or counts[1:3] == [1, 1] ) or ( # single occ gen var node behind fixed node counts[1] == 0 and ( (not isinstance(s, Variable) and counts[2] == 1) or (not isinstance(o, Variable) and counts[0] == 1) ) )): gp -= [t] # TODO: parallel edges like # ?s

?v1 . ?v1 ?t . # ?s

?v2 . ?v2 ?t . # TODO: remove fixed edge only connected patterns like # ?s

?t .

?v1 . ?v1 ?v2 . # TODO: maybe remove disconnected components (relevance in reality?) if len(gp) < 1: # for example: ?s ?v1 ?v2 . logger.log( config.LOGLVL_MUTSP, 'simplification of the following pattern resulted in empty pattern,' ' returning original pattern:\n%s', orig_gp, ) return orig_gp if orig_gp == gp: logger.log( config.LOGLVL_MUTSP, 'simplification had no effect on pattern:\n%s', gp, ) else: logger.log( config.LOGLVL_MUTSP, 'successfully simplified pattern:\n%swas simplified to:\n%s', orig_gp, gp, ) return gp @exception_stack_catcher def mutate( sparql, timeout, gtp_scores, child, pb_ae=config.MUTPB_AE, pb_dt=config.MUTPB_DT, pb_en=config.MUTPB_EN, pb_fv=config.MUTPB_FV, pb_id=config.MUTPB_ID, pb_iv=config.MUTPB_IV, pb_mv=config.MUTPB_MV, pb_sp=config.MUTPB_SP, pb_sv=config.MUTPB_SV, ): # mutate patterns: # grow: select random identifier and convert them into a var (local) # grow: select var and randomly split in 2 vars (local) # shrink: merge 2 vars (local) # shrink: del triple (local) # grow: select random node, add edge with / without neighbor (local for now) # grow: select random 2 nodes, add edge between (local for now) # grow: increase distance between source and target by moving one a hop away # shrink: fix variable (SPARQL) # shrink: simplify pattern orig_child = child assert fit_to_live(child), 'mutation on unfit child: %r' % (child,) if random.random() < pb_iv: child = mutate_introduce_var(child) if random.random() < pb_sv: child = mutate_split_var(child) if random.random() < pb_mv: child = mutate_merge_var(child) if random.random() < pb_dt: child = mutate_del_triple(child) if random.random() < pb_en: child = mutate_expand_node(child) if random.random() < pb_ae: child = mutate_add_edge(child) if random.random() < pb_id: child = mutate_increase_dist(child) if random.random() < pb_sp: child = mutate_simplify_pattern(child) if random.random() < pb_fv: child = canonicalize(child) children = mutate_fix_var(sparql, timeout, gtp_scores, child) else: children = [child] # TODO: deep & narrow paths mutation children = { c if fit_to_live(c) else orig_child for c in children } children = { canonicalize(c) for c in children } return list(children) def train(toolbox, population, run): hall_of_fame = deap.tools.HallOfFame(config.HOFSIZE) # pop = toolbox.population(n=50) pop = population g = 0 logger.info( 'Run %d, Generation %d: %d individuals', run, g, len(pop) ) logger.debug('Population: %r', pop) # Evaluate the entire population _evaluate = partial(toolbox.evaluate, run=run, gen=g) eval_results = list(parallel_map(_evaluate, pop)) logger.info( 'Run %d, Generation %d: evaluated %d individuals', run, g, len(pop) ) logger.debug('Evaluation results: %r', eval_results) update_individuals(pop, eval_results) hall_of_fame.update(pop) best_individual = hall_of_fame[0] best_individual_gen = g if not toolbox.generation_step_callback(g, pop): logger.info( 'terminating learning as requested by generation_step_callback' ) return g, pop, hall_of_fame # TODO: don't double eval same pattern? maybe a bit redundancy is good? # TODO: increase timeout if > x % of population fitnesses show timeout for g in range(1, config.NGEN + 1): # Select the next generation individuals offspring = toolbox.select(pop) logger.info( "Run %d, Generation %d: selected %d offspring individuals", run, g, len(offspring) ) logger.debug('Offspring: %r', offspring) # Clone the selected individuals # offspring = map(toolbox.clone, offspring) # Apply crossover and mutation on the offspring tmp = [] for child1, child2 in zip(offspring[::2], offspring[1::2]): if random.random() < config.CXPB: child1, child2 = toolbox.mate(child1, child2) tmp.append(child1) tmp.append(child2) offspring = tmp logger.info( "Run %d, Generation %d: %d individuals after mating", run, g, len(offspring) ) logger.debug('Offspring: %r', offspring) mutants = [] tmp = [] for child in offspring: if random.random() < config.MUTPB: mutants.append(child) else: tmp.append(child) offspring = tmp logger.debug('Offspring in gen %d to mutate: %r', g, mutants) mutant_children = list(parallel_map(toolbox.mutate, mutants)) logger.debug('Mutation results in gen %d: %r', g, mutant_children) for mcs in mutant_children: offspring.extend(mcs) logger.info( "Run %d, Generation %d: %d individuals after mutation", run, g, len(offspring) ) logger.debug('Offspring: %r', offspring) # don't completely replace pop, but keep good individuals # will draw individuals from the first 10 % of the HOF # CDF: 0: 40 %, 1: 64 %, 2: 78 %, 3: 87 %, 4: 92 %, 5: 95 %, ... l = len(hall_of_fame) offspring += [ hall_of_fame[int(random.betavariate(1, 50) * (l - 1))] for _ in range(config.HOFPAT_REINTRO) ] # always re-introduce some variable patterns with ?source and ?target offspring += [ child for child in generate_variable_patterns(config.VARPAT_REINTRO) if fit_to_live(child) ] logger.info( 'Run %d, Generation %d: %d individuals after re-adding hall-of-fame' ' and variable patterns', run, g, len(offspring) ) logger.debug('Offspring: %r', offspring) # Evaluate the individuals with an invalid fitness invalid_ind = [ind for ind in offspring if not ind.fitness.valid] logger.info( "Run %d, Generation %d: %d individuals to evaluate", run, g, len(invalid_ind) ) logger.debug('Evaluating individuals in gen %d: %r', g, invalid_ind) _evaluate = partial(toolbox.evaluate, run=run, gen=g) eval_results = list(parallel_map(_evaluate, invalid_ind)) logger.info( "Run %d, Generation %d: %d individuals evaluated", run, g, len(eval_results) ) logger.debug('Evaluation results in gen %d: %r', g, eval_results) update_individuals(invalid_ind, eval_results) hall_of_fame.update(invalid_ind) # replace population with this generation's offspring pop[:] = offspring logger.info( "Run %d, Generation %d: %d individuals", run, g, len(pop) ) logger.debug('Population of generation %d: %r', g, pop) if not toolbox.generation_step_callback(g, pop): logger.info( 'terminating learning as requested by generation_step_callback' ) break if best_individual.fitness < hall_of_fame[0].fitness: best_individual = hall_of_fame[0] best_individual_gen = g if g >= best_individual_gen + config.NGEN_NO_IMPROVEMENT: logger.info( 'terminating learning after generation %d: ' 'no better individual found since best in generation %d.', g, best_individual_gen ) break return g, pop, hall_of_fame def generate_variable_pattern(dist): # generates a variable pattern with a given dist between source and target. # TODO: optimize by directly picking from all possible patterns? # the idea here is that we need some kind of "walk" / connection between # source and target and all else relies on mutations and mating to form more # complicated patterns. It's possible though, that actually just starting # from random triple patterns would speed up the algorithm, so maybe make # it an alternative? assert dist > 0 # path of properties and targets of length dist ending at TARGET_VAR ps = [gen_random_var() for _ in range(dist)] ts = [gen_random_var() for _ in range(dist - 1)] + [TARGET_VAR] s = SOURCE_VAR # start at source triples = [] for p, t in zip(ps, ts): e = (s, p, t) if random.random() < .5: # edge in any direction e = (t, p, s) triples.append(e) s = t return canonicalize(GraphPattern(triples)) def generate_variable_patterns(count): u = Variable('u') p = Variable('p') v = Variable('v') s_out = canonicalize(GraphPattern([(SOURCE_VAR, p, v)])) s_in = canonicalize(GraphPattern([(u, p, SOURCE_VAR)])) t_out = canonicalize(GraphPattern([(TARGET_VAR, p, v)])) t_in = canonicalize(GraphPattern([(u, p, TARGET_VAR)])) res = [] alpha = config.INIT_POP_LEN_ALPHA beta = config.INIT_POP_LEN_BETA for _ in range(count): # more likely to create shorter variable patterns, only few incomplete # with default values the distribution is as follows: # PDF: 0: 7 %, 1: 41 %, 2: 35 %, 3: 13 %, 4: 2.7 %, 5: 0.4 %, ... # CDF: 0: 7 %, 1: 48 %, 2: 84 %, 3: 96.8 %, 4: 99.5 %, 5: 99.9 %, ... dist = int(random.betavariate(alpha, beta) * config.MAX_PATTERN_LENGTH) if dist < 1: gp = random.choice((s_out, s_in, t_out, t_in),) else: gp = generate_variable_pattern(dist) res.append(gp) # can happen that pattern is too long and thereby not fit_to_live: # can be > config.MAX_PATTERN_LENGTH or > config.MAX_PATTERN_VARS. # The latter is sometimes desired if those variables go through # mutate_fix_var before ending up in any population return res def generate_init_population( sparql, timeout, gtp_scores, pb_fv=config.INIT_POPPB_FV, n=config.INIT_POPPB_FV_N, init_patterns=None, pb_init_pattern=config.INIT_POPPB_INIT_PAT, ): logger.info('generating init population of seed size %d', config.POPSIZE) population = [] # Variable patterns: var_pats = generate_variable_patterns(config.POPSIZE) if init_patterns: # replace var pats with random ones from init_patterns according to prob var_pats = [ p for p in var_pats if random.random() > pb_init_pattern ] for _ in range(len(var_pats), config.POPSIZE): var_pats.append(random.choice(init_patterns).copy()) random.shuffle(var_pats) # initial run of mutate_fix_var to instantiate many of the variable patterns # TODO: maybe loop this? (why only try to fix one var?) to_fix = [] for vp in var_pats: if random.random() < pb_fv: to_fix.append(vp) else: population.append(vp) logger.info( 'initial mutate_fix_var run on %d individuals', len(to_fix) ) p_mfv = partial(mutate_fix_var, sparql, timeout, gtp_scores, sample_max_n=n) fixed_result_patterns_per_vp = list(parallel_map(p_mfv, to_fix)) for fixed_result_patterns in fixed_result_patterns_per_vp: population.extend(fixed_result_patterns) # rare possibility that patterns with too many vars are generated fit_population = [gp for gp in population if fit_to_live(gp)] l = len(fit_population) logger.info( 'after initial mutate_fix_var run init population now has %d ' 'individuals', l ) if l < config.POPSIZE: lvl = logging.DEBUG if l < 0.95 * config.POPSIZE: lvl = logging.INFO if l < 0.50 * config.POPSIZE: lvl = logging.WARNING sample_unfit = random.sample( [gp for gp in population if gp not in fit_population], min(10, len(population) - l) ) logger.log( lvl, 'generated init population of size %d < POPSIZE = %d\n' 'check these config variables:\n' ' MAX_PATTERN_LENGTH: %d\n' ' MAX_PATTERN_VARS: %d\n' ' INIT_POP_LEN_ALPHA: %.3f\n' ' INIT_POP_LEN_BETA: %.3f\n' ' INIT_POPPB_FV: %.3f\n' ' INIT_POPPB_FV_N: %.3f\n' 'it seems they are selected in a way that we are generating not ' 'fit_to_live patterns in generate_variable_patterns().\n' 'var_pats: %d\n' '%d graph patterns before dropping %d like these (samples):\n%s', l, config.POPSIZE, config.MAX_PATTERN_LENGTH, config.MAX_PATTERN_VARS, config.INIT_POP_LEN_ALPHA, config.INIT_POP_LEN_BETA, config.INIT_POPPB_FV, config.INIT_POPPB_FV_N, len(var_pats), len(population), len(population) - l, ''.join(str(gp) for gp in sample_unfit) ) return fit_population def generation_step_callback( run, gtp_scores, user_callback_per_generation, ngen, population ): """Called after each generation step cycle in train(). :param run: number of the current run :param gtp_scores: gtp_scores as of start of this run :param user_callback_per_generation: a user provided callback that is called after each training generation. If not None called like this: user_callback_per_generation(run, gtp_scores, ngen, population) If user_callback_per_generation returns anything else than None, it overrides our return condition. :param ngen: the number of the current generation. :param population: the current population after generation ngen. :return: If user_callback_per_generation returns anything but None, it is used as the return value. Else, if config.QUICK_STOP and the current population is good enough to cover the remaining gains so we end up below the MIN_REMAINING_GAIN, we signal train to quick-stop by returning False. Otherwise (the likely default), we signal train to continue training by returning True. """ assert isinstance(gtp_scores, GTPScores) top_counter = print_population(run, ngen, population) top_gps = sorted( top_counter.keys(), key=attrgetter("fitness"), reverse=True ) generation_gtp_scores = gtp_scores.copy_reset() generation_gtp_scores.update_with_gps(top_gps) save_generation( run, ngen, top_gps, generation_gtp_scores ) qsbs = [v for v in parallel_map(query_stats, [(run, ngen)] * 1000) if v] qs, bs = zip(*qsbs) qs = sum(qs) bs = Counter(bs).most_common() logger.info('QueryStats totals:\n Batch-Sizes: %s\n%s', kv_str(bs), qs) pause_if_signaled_by_file() if user_callback_per_generation: # user provided callback res = user_callback_per_generation(run, gtp_scores, ngen, population) if res is not None: return res pre_gtp_scores = gtp_scores.copy() pre_gtp_scores.update_with_gps(population) rem_gain = gtp_scores.remaining_gain pot_rem_gain = pre_gtp_scores.remaining_gain pot_gain = rem_gain - pot_rem_gain l = len(gtp_scores) logger.info( "Run %d:\n" " remains: %.1f / %d total = %.1f %%\n" " coverage: %.1f / %d total = %.1f %%\n" "Generation %d:\n" " potential remains: %.1f (%.1f %%)\n" " potential coverage: %.1f (%.1f %%)\n" "Potential gain: %.1f (%.1f %%)\n" '(without run post-processing, e.g., min-fitness filtering)', run, rem_gain, l, rem_gain * 100 / l, l - rem_gain, l, (l - rem_gain) * 100 / l, ngen, pot_rem_gain, pot_rem_gain * 100 / l, l - pot_rem_gain, (l - pot_rem_gain) * 100 / l, pot_gain, pot_gain * 100 / l, ) return not check_quick_stop(pre_gtp_scores) def check_quick_stop( pre_gtp_scores, quick_stop=config.QUICK_STOP, min_remaining_gain=config.MIN_REMAINING_GAIN, ): if quick_stop: if pre_gtp_scores.remaining_gain < min_remaining_gain: logger.info('quick-stop condition reached') return True return False def find_graph_patterns( sparql, run, gtp_scores, init_patterns=None, user_callback_per_generation=None, ): timeout = calibrate_query_timeout(sparql) toolbox = deap.base.Toolbox() toolbox.register( "mate", mate ) toolbox.register( "mutate", mutate, sparql, timeout, gtp_scores, ) toolbox.register( "select", deap.tools.selTournament, k=config.POPSIZE, tournsize=config.TOURNAMENT_SIZE, ) toolbox.register( "evaluate", evaluate, sparql, timeout, gtp_scores) toolbox.register( "generation_step_callback", generation_step_callback, run, gtp_scores, user_callback_per_generation ) population = generate_init_population( sparql, timeout, gtp_scores, init_patterns=init_patterns, ) # noinspection PyTypeChecker ngen, res_population, hall_of_fame = train(toolbox, population, run) print("\n\n\nhall of fame:") for r in hall_of_fame[:20]: assert isinstance(r, GraphPattern) print_graph_pattern(r) return ngen, res_population, hall_of_fame, toolbox def calc_min_fitness(gtp_scores, min_score): """Calculates the minimum desired fitness in the current run. In each run, the fitness tuple has a first component "remains", which is constant. The score is what we actually want to set to min_score. """ min_fitness = GPFitnessTuple( remains=gtp_scores.remaining_gain, score=min_score, ) return GPFitness(min_fitness) def _find_graph_pattern_coverage_run( sparql, min_score, run, coverage_counts, gtp_scores, patterns, init_patterns=None, user_callback_per_generation=None, user_callback_per_run=None, ): min_fitness = calc_min_fitness(gtp_scores, min_score) ngen, res_pop, hall_of_fame, toolbox = find_graph_patterns( sparql, run, gtp_scores, init_patterns=init_patterns, user_callback_per_generation=user_callback_per_generation, ) # TODO: coverage patterns should be chosen based on similarity new_best_patterns = [] for pat in hall_of_fame: if pat.fitness < min_fitness: logging.info( 'skipping remaining patterns cause they are below min_fitness:' '\n%s', min_fitness.format_fitness() ) break s_pat = canonicalize(mutate_simplify_pattern(pat)) if pat in patterns: logger.info( 'found pattern again, skipping for graph pattern coverage:\n' '%s', format_graph_pattern(pat, 0), ) elif s_pat in patterns: logger.info( 'found pattern again (simpler version already in results), ' 'skipping for graph pattern coverage:\n' 'Orig:\n%sSimplified:\n%s', format_graph_pattern(pat, 0), format_graph_pattern(s_pat, 0) ) else: if pat != s_pat: # seems the current pattern isn't as simple as possible, # check if the simplified version is better (expected) # make sure s_pat has a fitness # noinspection PyUnresolvedReferences update_individuals([s_pat], [toolbox.evaluate(s_pat)]) # noinspection PyProtectedMember if s_pat.fitness >= pat.fitness: logging.info( 'using improved result pattern by simplification:\n' 'Orig:\n%sSimplified:\n%s', format_graph_pattern(pat, 0), format_graph_pattern(s_pat, 0), ) pat = s_pat elif (s_pat.fitness.values._replace(qtime=0) == pat.fitness.values._replace(qtime=0)): # can happen that just vars were renamed and # the simplified query took a bit longer: logging.info( 'using canonical pattern even though a bit slower:\n' 'Orig:\n%sSimplified:\n%s', format_graph_pattern(pat, 0), format_graph_pattern(s_pat, 0), ) pat = s_pat else: if s_pat.fitness.values.timeout: logging.info( 'simplified pattern has worse fitness and timed ' 'out, using original instead:\n' 'Orig:\n%sSimplified:\n%s', format_graph_pattern(pat, 0), format_graph_pattern(s_pat, 0), ) else: logger.warning( 'simplified pattern has worse fitness, using ' 'original instead:\n' 'Orig:\n%sSimplified:\n%s', format_graph_pattern(pat, 0), format_graph_pattern(s_pat, 0), ) logger.info( 'found new pattern for graph pattern coverage:\n%s', format_graph_pattern(pat, 1000), ) new_best_patterns.append((pat, run)) # finally update gtp_scores with the new patterns (don't do this before as # evaluate of simplified patterns would otherwise have different remains and # thereby always return inferior results) for pat, run in new_best_patterns: coverage_counts.update(pat.matching_node_pairs) new_best_gps = [gp for gp, _ in new_best_patterns] gtp_scores.update_with_gps(new_best_gps) run_gtp_scores = gtp_scores.copy_reset() run_gtp_scores.update_with_gps(new_best_gps) save_run( new_best_patterns, coverage_counts=coverage_counts, run_gtp_scores=run_gtp_scores, overall_gtp_scores=gtp_scores, run=run, ) if user_callback_per_run: user_callback_per_run( run, gtp_scores, new_best_patterns, coverage_counts ) return new_best_patterns, coverage_counts, gtp_scores def find_graph_pattern_coverage( sparql, ground_truth_pairs, init_patterns=None, min_score=config.MIN_SCORE, min_remaining_gain=config.MIN_REMAINING_GAIN, max_runs=config.NRUNS, runs_no_improvement=config.NRUNS_NO_IMPROVEMENT, error_retries=config.ERROR_RETRIES, user_callback_per_generation=None, user_callback_per_run=None, ): assert isinstance(ground_truth_pairs, tuple) logger.info( 'Started learning:\n' 'NRUNS=%d, NRUNS_NO_IMPROVEMENT=%d,\n' 'NGEN=%d, NGEN_NO_IMPROVEMENT=%d', config.NRUNS, config.NRUNS_NO_IMPROVEMENT, config.NGEN, config.NGEN_NO_IMPROVEMENT, ) error_count = 0 # the following are modified in-place by _find_graph_pattern_coverage_run() ground_truth_pairs = list(ground_truth_pairs) coverage_counts = Counter({gtp: 0 for gtp in ground_truth_pairs}) gtp_scores = GTPScores(ground_truth_pairs) patterns = {} last_pattern_update_in_run = 0 run = 1 while run <= max_runs: # noinspection PyBroadException try: # run in a loop and remove ground-truth pairs that # are best matched until we have good patterns for all gt pairs remaining_gain = gtp_scores.remaining_gain if remaining_gain < min_remaining_gain: break prev_run = find_run_result(run) if prev_run: res = load_results(prev_run) else: res = _find_graph_pattern_coverage_run( sparql, min_score, run, coverage_counts, gtp_scores, patterns, init_patterns=init_patterns, user_callback_per_generation=user_callback_per_generation, user_callback_per_run=user_callback_per_run, ) new_best_patterns, coverage_counts, gtp_scores = res patterns.update({pat: run for pat, run in new_best_patterns}) new_remaining_gain = gtp_scores.remaining_gain if new_remaining_gain < remaining_gain: last_pattern_update_in_run = run logger.info( 'coverage improvement in run %d: %.2f - %.2f = %.2f', run, remaining_gain, new_remaining_gain, remaining_gain - new_remaining_gain ) else: logger.info('no coverage improvement in run %d', run) if run >= last_pattern_update_in_run + runs_no_improvement: logger.info( 'no coverage improvement in the last %d runs, stopping', runs_no_improvement ) break run += 1 except GPLearnerAbortException as e: # this exception is only raised to intentionally abort the whole # learning process. Don't run auto-retry. logger.info('gp learner was aborted intentionally: %r', e) raise except Exception as e: error_count += 1 logger.error('uncaught exception in run %d', run) log_wrapped_exception(logger, e) if error_count > error_retries: logger.error( 'giving up after %d > %d errors', error_count, error_retries ) raise else: logger.error( 'this was uncaught exception number %d, will retry in %ds ' 'despite error...', error_count, config.ERROR_WAIT ) logging_config.save_error_logs() sleep(config.ERROR_WAIT) # sort patterns by fitness, run and then pattern patterns = sorted( patterns.items(), key=lambda x: ([-v for v in x[0].fitness.wvalues], x[1], x[0]) ) return patterns, coverage_counts, gtp_scores def predict_target_candidates( sparql, timeout, gps, source, parallel=None, exclude_source=None): """Uses the gps to predict target candidates for the given source. :param sparql: SPARQLWrapper endpoint. :param timeout: Timeout in seconds for each individual query (gp). :param gps: A list of evaluated GraphPattern objects (fitness is used). :param source: source node for which to predict target candidates. :param parallel: execute prediction queries in parallel? :param exclude_source: remove targets that are source node? :return: A list of target_candidate sets for each gp. """ if parallel is None: parallel = config.PREDICTION_IN_PARALLEL if exclude_source is None: exclude_source = config.PREDICTION_EXCLUDE_SOURCE pq = partial( predict_query, sparql, timeout, source=source, ) map_ = parallel_map if parallel else map results = map_(pq, gps) # drop timings: res = [target_candidates for _, target_candidates in results] if exclude_source: res = [tcs - {source} for tcs in res] return res def predict_multi_target_candidates( sparql, timeout, gps, sources, parallel=None, exclude_source=None): """Uses the gps to predict target candidates for the given source. :param sparql: SPARQLWrapper endpoint. :param timeout: Timeout in seconds for each individual query (gp). :param gps: A list of evaluated GraphPattern objects (fitness is used). :param sources: source nodes for which to predict target candidates. :param parallel: execute prediction queries in parallel? :param exclude_source: remove targets that are source node? :return: A list containing a dict of source to set(tcs) for each gp. """ assert len(sources) > 1 and isinstance(sources[0], (URIRef, Literal)) if parallel is None: parallel = config.PREDICTION_IN_PARALLEL if exclude_source is None: exclude_source = config.PREDICTION_EXCLUDE_SOURCE pq = partial( predict_multi_query, sparql, timeout, sources=sources, ) map_ = parallel_map if parallel else map results = map_(pq, gps) # drop timings: res = [stcs for _, stcs in results] if exclude_source: res = [ OrderedDict([ (s, tcs - {s}) for s, tcs in stcs.items() ]) for stcs in res ] return res def predict_fused_targets( sparql, timeout, gps, source, parallel=None, fusion_methods=None, exclude_source=None, ): """Predict candidates and fuse the results.""" return fuse_prediction_results( gps, predict_target_candidates( sparql, timeout, gps, source, parallel, exclude_source), fusion_methods ) def find_in_prediction(prediction, target): try: targets, scores = zip(*prediction) return targets.index(target) except ValueError: return -1 def format_prediction_results(method, res, target=None, idx=None, n=10): assert not ((target is None) ^ (idx is None)), \ "target and idx should both be None or neither" rs = [ ' Top %d predictions (method: %s)%s' % ( n, method, (", target at idx: %d" % idx) if idx is not None else '' ) ] for t, score in res[:n]: rs.append( ' ' + ('->' if t == target else ' ') + '%s (%.3f)' % (t.n3(), score) ) return '\n'.join(rs) def print_prediction_results(method, res, target=None, idx=None, n=10): print(format_prediction_results(method, res, target, idx, n)) def evaluate_predictions( sparql, gps, gtps, gtp_predicted_fused_targets=None, fusion_methods=None): recall = 0 method_idxs = defaultdict(list) method_order = [] res_lens = [] timeout = calibrate_query_timeout(sparql) for i, (source, target) in enumerate(gtps, 1): print('%d/%d: predicting target for %s (ground truth: %s):' % ( i, len(gtps), source.n3(), target.n3())) if gtp_predicted_fused_targets: method_res = gtp_predicted_fused_targets[i-1] else: method_res = predict_fused_targets( sparql, timeout, gps, source, fusion_methods=fusion_methods) once = False if not method_order: method_order = method_res.keys() for method, res in method_res.items(): idx = find_in_prediction(res, target) if not once: once = True if idx < 0: print(' target not found') else: recall += 1 n = len(res) res_lens.append(n) print(' result list length: %d' % n) method_idxs[method].append(idx) print_prediction_results(method, res, target, idx) recall /= len(gtps) print("Ground Truth Pairs: %s" % gtps) print("Result list lenghts: %s" % res_lens) print("Recall of test set: %.5f" % recall) for method, indices in [(m, method_idxs[m]) for m in method_order]: print("\nIndices for method %s:\n'%s': %s" % (method, method, indices)) avg_idx = np.average([i for i in indices if i >= 0]) median_idx = np.median([i for i in indices if i >= 0]) ranks = np.array(indices, dtype='f8') + 1 # noinspection PyTypeChecker mrr = np.sum(1 / ranks[ranks > 0]) / len(indices) # noinspection PyTypeChecker ndcg = np.sum(1 / np.log2(1 + ranks[ranks > 0])) / len(indices) # noinspection PyStringFormat print( " Avg. index %s: %.3f, Median index: %.3f\n" " MAP (MRR): %.3f, NDCG: %.3f" % ( method, avg_idx, median_idx, mrr, ndcg)) recalls_at = [ (k, len([True for i in indices if k > i >= 0]) / len(indices)) for k in (1, 2, 3, 5, 7, 10, 15, 20, 25, 30, 40, 50, 75, 100) ] print(" k:\t%s" % '\t'.join('% 5d' % k for k, r in recalls_at)) print(" recall@k:\t%s" % '\t'.join('%.3f' % r for k, r in recalls_at)) # noinspection PyUnusedLocal @log_all_exceptions(logger) def main( sparql_endpoint=config.SPARQL_ENDPOINT, associations_filename=None, splitting_variant='random', train_filename=None, test_filename=None, swap_source_target=False, drop_invalid=False, init_patterns_filename=None, print_train_test_sets=True, reset=False, print_topn_raw_patterns=0, print_edge_only_connected_patterns=True, show_precision_loss_by_query_reduction=False, max_queries=0, clustering_variant=None, print_query_patterns=False, predict='', fusion_methods=None, tests=False, **kwds ): logging.info('encoding check: äöüß🎅') # logging utf-8 byte string logging.info(u'encoding check: äöüß\U0001F385') # logging unicode string logging.info(u'encoding check: äöüß\U0001F385'.encode('utf-8')) # convert print('encoding check: äöüß🎅') # printing utf-8 byte string print(u'encoding check: äöüß\U0001F385') # printing unicode string # init workers init_workers() timer_start = datetime.utcnow() main_start = timer_start gsa = partial( get_semantic_associations, swap_source_target=swap_source_target, drop_invalid=drop_invalid, ) if not train_filename and not test_filename: # get semantic association pairs and split in train and test sets semantic_associations = gsa(associations_filename) assocs_train, assocs_test = split_training_test_set( semantic_associations, variant=splitting_variant ) else: assocs_train = gsa(train_filename) if train_filename else [] assocs_test = gsa(test_filename) if test_filename else [] if predict == 'train_set': assert assocs_train, 'trying to train but train file empty' if predict == 'test_set': assert assocs_test, 'trying to test but test file empty' logger.info( 'training on %d association pairs and testing on %d', len(assocs_train), len(assocs_test) ) sys.stdout.flush() sys.stderr.flush() if print_train_test_sets: if assocs_train: print( "Training Set Source Target Pairs:\n" "=================================" ) for s, t in assocs_train: print("Train: %s %s" % (s.n3(), t.n3())) if assocs_test: print( "\n\n" "Test Set Source Target Pairs:\n" "=============================" ) for s, t in assocs_test: print("Test: %s %s" % (s.n3(), t.n3())) sys.stdout.flush() sys.stderr.flush() semantic_associations = tuple(sorted(assocs_train)) # setup node expander sparql = SPARQLWrapper.SPARQLWrapper(sparql_endpoint) init_patterns = None if init_patterns_filename: init_patterns = load_init_patterns(init_patterns_filename) if reset: remove_old_result_files() last_res = find_last_result() if not last_res: res = find_graph_pattern_coverage( sparql, semantic_associations, init_patterns=init_patterns, ) result_patterns, coverage_counts, gtp_scores = res sys.stderr.flush() save_results( result_patterns, coverage_counts, gtp_scores, ) timer_stop = datetime.utcnow() logging.info('Training took: %s', timer_stop - timer_start) else: result_patterns, coverage_counts, gtp_scores = load_results(last_res) timer_stop = datetime.utcnow() logging.info('Loading model took: %s', timer_stop - timer_start) timer_start = timer_stop sys.stdout.flush() sys.stderr.flush() if not result_patterns: print("It seems as if no patterns that satisfy your constraints could " "be found in training. Consider increasing QUERY_TIMEOUT_MIN, " "POPSIZE, decreasing MIN_SCORE or changing other parameters " "listed by --help") sys.exit(1) print_results( result_patterns, coverage_counts, gtp_scores, n=print_topn_raw_patterns, edge_only_connected_patterns=print_edge_only_connected_patterns, ) gps = [gp for gp, _ in result_patterns] print('raw patterns: %d' % len(gps)) sys.stdout.flush() sys.stderr.flush() if show_precision_loss_by_query_reduction: # amount of requests one wants to make for a prediction max_ks = [1, 2, 3, 4, 5, 7] + range(10, 101, 5) expected_precision_loss_by_query_reduction( gps, semantic_associations, max_ks, gtp_scores, variants=[clustering_variant] if clustering_variant else None, plot_precision_losses_over_k=True ) sys.stdout.flush() sys.stderr.flush() # reduce gps by clustering if mandated by max_queries gps = cluster_gps_to_reduce_queries( gps, max_queries, gtp_scores, clustering_variant) if print_query_patterns: print( '\nusing the following %d graph patterns for prediction:' % len(gps) ) for i, gp in enumerate(gps): print('Graph pattern #%d:' % i) print_graph_pattern(gp, print_matching_node_pairs=0) sys.stdout.flush() sys.stderr.flush() timer_stop = datetime.utcnow() logging.info('Query reduction took: %s', timer_stop - timer_start) timer_start = timer_stop if predict == 'train_set': loaded_predictions = load_predicted_target_candidates() gtps = assocs_train if predict == 'train_set' else assocs_test if not loaded_predictions: print('\n\n\nstarting prediction on %s' % predict) timeout = calibrate_query_timeout(sparql) gtp_gp_tcs = [] for i, (source, target) in enumerate(gtps): logger.info( '%d/%d: predicting target candidates for source: %s ' '(gt target: %s)', i+1, len(gtps), source, target ) gtp_gp_tcs.append( predict_target_candidates(sparql, timeout, gps, source) ) save_predicted_target_candidates(gps, gtps, gtp_gp_tcs) sys.stdout.flush() sys.stderr.flush() timer_stop = datetime.utcnow() logging.info('Batch prediction of %s took: %s', predict, timer_stop - timer_start) timer_start = timer_stop else: _gps, _gtps, gtp_gp_tcs = loaded_predictions assert gps == _gps, ( "result patterns learned from previous execution did not match " "the current ones (e.g. due to changed --max_queries and/or " "clustering). Consider removing generated *.pkl.gz temp files" ) assert gtps == _gtps, ( "ground truth pairs from previous execution do not match the " "current ones (e.g. due to changed --associations_filename). " "Consider re-running full training with --reset, running in " "manual mode or invoking serve.py." ) train_fusion_models(gps, gtps, gtp_gp_tcs, fusion_methods) timer_stop = datetime.utcnow() logging.info( 'Training fusion models took: %s', timer_stop - timer_start) timer_start = timer_stop logging.info('Batch fusing all prediction candidates...') gtp_predicted_fused_targets = [ fuse_prediction_results(gps, gp_tcs, fusion_methods) for gp_tcs in gtp_gp_tcs ] timer_stop = datetime.utcnow() logging.info('Batch fusing all prediction candidates took: %s', timer_stop - timer_start) timer_start = timer_stop evaluate_predictions(sparql, gps, gtps, gtp_predicted_fused_targets) if predict == 'test_set': gtps = assocs_test print('\n\n\nstarting prediction on %s' % predict) evaluate_predictions(sparql, gps, gtps, fusion_methods=fusion_methods) sys.stdout.flush() sys.stderr.flush() timer_stop = datetime.utcnow() logging.info('Batch prediction of %s took: %s', predict, timer_stop - timer_start) timer_start = timer_stop if predict == 'manual': timeout = calibrate_query_timeout(sparql) sys.stdout.flush() sys.stderr.flush() while True: s = six.moves.input( '\n\nEnter a DBpedia resource as source:\n' '> http://dbpedia.org/resource/' ) source = URIRef('http://dbpedia.org/resource/' + s) method_res = predict_fused_targets( sparql, timeout, gps, source, fusion_methods) for method, res in method_res.items(): print_prediction_results(method, res) return sparql, gps, fusion_methods