Source code for priops.pathfinder

from priops.edge import IdentityEdge
from priops.path import Path

[docs]class NoPath(TypeError): pass
[docs]class Pathfinder: """An algorithm for finding a path through the graph defined by a :class:`~priops.Priop`."""
[docs] def __init__(self, priop, max_components=None, max_weight=None): """Sets the Pathfinder up to search for pathes in *priop* with parameters *max_components* and *max_steps*. *max_components* gives the maximum number of edges to combine into the path. *max_weight* gives the maximum weight of a path to be considered. The default for *max_components* is 1 and the default for *max_weight* is one.""" if max_components is None: max_components = 1 if max_weight is None: max_weight = 1.0 self.priop = priop self.max_components = max_components self.max_weight = max_weight
[docs] def find_path(self, input_node, output_node): """Finds a best path to traverse *self.priop* from input node *input_node* to output node *output_node*.""" return self._find_path(input_node=input_node, output_node=output_node, max_components=self.max_components, max_weight=self.max_weight)
def _find_path(self, input_node, output_node, max_components, max_weight): """Finds a path from *input_node* to *output_node* using at most *max_components* edges and with maximum weight *max_weight*.""" # The max_weight might diminish as soon as we have found a path. current_max_weight = max_weight current_path = None self._debug(max_components, input_node, "->", output_node) # Select a portion of the *output_node* to be generated by an edge: for start_idx in xrange(0, len(input_node) + 1): # We exclude spawning operands from nothing (deemed too slow), so # we start at *start_idx + 1* with the search for *stop_idx*: for stop_idx in xrange(start_idx + 1, len(input_node) + 1): # What is the currently used part of the *input_node*? input_node_used = input_node[start_idx:stop_idx] self._debug(max_components, '*', "input node used:", input_node_used) # Find an edge in *self.priop* that can be used for # *input_node_used*: for edge in self.priop: self._debug(max_components, ' ', "trying edge", edge) # Is the edge eligible? if input_node_used.fulfils_specification( edge.input_node) and edge.weight <= current_max_weight: self._debug(max_components, ' ', 'the edge ' 'fulfils the specification') # Find the input node resulting from applying the # edge: node_applied_edge = input_node[:start_idx] + \ edge.output_node + input_node[stop_idx:] self._debug(max_components, ' ', 'node with ' 'applied edge:', node_applied_edge) # Compose the full edge: full_edge = \ IdentityEdge(input_node[:start_idx]) + \ edge + \ IdentityEdge(input_node[stop_idx:]) self._debug(max_components, ' ', 'the full edge ' 'is:', full_edge) # Does this edge solve the problem? if node_applied_edge.fulfils_specification( output_node): self._debug(max_components, ' ', 'the edge ' 'solves the problem') current_path = Path([full_edge]) current_max_weight = current_path.weight self._debug(max_components, ' ', 'path stored, ' 'weight', current_max_weight) # This edge does not solve the problem alone; can # we use more edges? elif max_components > 1: self._debug(max_components, ' ', 'the edge ' 'does not solve the problem yet') # Try to complete the path by finding more steps: try: completion = self._find_path( input_node=node_applied_edge, output_node=output_node, max_components=(max_components - 1), max_weight=(max_weight - full_edge.weight)) current_path = full_edge ^ completion current_max_weight = current_path.weight self._debug(max_components, ' ', 'completion ' 'was successful with weight', current_max_weight) except NoPath: # But there is no such path. self._debug(max_components, ' ', 'completion ' 'was unsuccessful') pass if current_path is None: # No path found self._debug(max_components, 'no path found') raise NoPath('No path found') else: self._debug(max_components, 'path found, weight', current_max_weight) return current_path def _debug(self, max_components, *msg): """Prints a debug message *msg* with indentation based on *max_components*. If *max_components* equals *self.max_components*, no indentation will be done, with decreasing *max_components* more and more indentation will be done.""" return # Cut a long story short. print " " * (self.max_components - max_components), for msgi in msg: print msgi, print