Source code for helios.layouts.base

"""Network Layout Abstract Classes

This module provides a set of abstract classes to deal with
different network layouts algorithms using different communication
strategies.

"""

from abc import ABC, ABCMeta, abstractmethod
import time
import sys
import subprocess
import numpy as np

from fury.stream.tools import IntervalTimer

from helios.layouts.ipc_tools import ShmManagerMultiArrays


[docs]class NetworkLayout(ABC): @abstractmethod def steps(self, iterations=1): ... @property def positions(self): return self._super_actor.positions @positions.setter def positions(self, new_positions): self._positions = new_positions def update(self): self._network_draw.positions = self._positions self._network_draw.refresh()
[docs]class NetworkLayoutAsync(NetworkLayout, metaclass=ABCMeta): @abstractmethod def start(self, max_iterations=None): ... @abstractmethod def stop(self): ... def __del__(self): self.stop()
[docs]def is_running(p, timeout=0): '''Check if the process *p* is running Parameters ---------- p : process timeout : float, optional positive float Returns -------- running : bool ''' try: p.wait(timeout=timeout) running = False except subprocess.TimeoutExpired: assert p.returncode is None running = True return running
[docs]class NetworkLayoutIPCServerCalc(ABC): """An abstract class which reads the network information from the shared memory resources. This should be used inside of a subprocess which will update the network layout positions """ def __init__( self, num_nodes, num_edges, edges_buffer_name, positions_buffer_name, info_buffer_name, weights_buffer_name=None, dimension=3, snaphosts_buffer_name=None, num_snapshots=0, ): """ Parameters ---------- num_nodes : int num_edges : int edges_buffer_name : str positions_buffer_name : str info_buffer_name : str weights_buffer_name : str, optional dimension : int snaphosts_buffer_name : str, optional num_snapshots : int, optional """ self._dimension = dimension self._shm_manager = ShmManagerMultiArrays() self._shm_manager.load_array( 'info', buffer_name=info_buffer_name, dimension=1, dtype='float32', num_elements=3) self._shm_manager.load_array( 'positions', buffer_name=positions_buffer_name, dimension=self._dimension, dtype='float32', num_elements=num_nodes) self._shm_manager.load_array( 'edges', buffer_name=edges_buffer_name, dimension=2, dtype='int64', num_elements=num_edges) if weights_buffer_name is not None: self._shm_manager.load_array( 'weights', buffer_name=weights_buffer_name, dimension=1, dtype='float32', num_elements=num_edges) self._record_positions = snaphosts_buffer_name is not None if self._record_positions: self._shm_manager.load_array( 'snapshots_positions', buffer_name=snaphosts_buffer_name, dimension=self._dimension, dtype='float32', num_elements=num_nodes*num_snapshots) self._num_snapshots = num_snapshots
[docs] @abstractmethod def start(self, steps=100, iters_by_step=3): """This method starts the network layout algorithm. Parameters ---------- steps : int number of iterations to perform iters_by_step: int number of iterations to perform between each step """ ...
def _update(self, positions, step): """This method update the shared memory resource which stores the network positions. Usually, you should call this inside of the start method implementation Parameters ---------- positions : ndarray a numpy array with shape (num_nodes, self._dimension) """ # self._shm_manager.update_array(if self._record_positions: if self._record_positions: self._shm_manager.snapshots_positions.update_snapshot( positions, step % self._num_snapshots) self._shm_manager.positions.data = positions self._shm_manager.info._repr[0] = time.time() self._shm_manager.info._repr[2] = step def __del__(self): self._shm_manager.cleanup()
[docs]class NetworkLayoutIPCRender(ABC): """An abstract class which reads the network information and creates the shared memory resources. """ def __init__(self, network_draw, edges, weights=None): """ Parameters ---------- network_draw : NetworkDraw A NetworkDraw object which will be used to draw the network edges : ndarray a bi-dimensional array with the edges list weights : array, optional a one-dimensional array with the edge weights """ self._started = False self._interval_timer = None self._id_observer = None self._id_timer = None self._pserver = None self._network_draw = network_draw self._record_positions = False self._dimension = 2 if network_draw._is_2d else 3 self._shm_manager = ShmManagerMultiArrays() self._shm_manager.add_array( 'positions', network_draw.positions[:, 0:self._dimension], self._dimension, 'float32' ) self._shm_manager.add_array( 'info', np.array([0, 0, 0]), 1, 'float32' ) self._shm_manager.add_array( 'edges', edges, 2, 'int64' ) if weights is not None: self._shm_manager.add_array( 'weights', weights, 1, 'float32' ) self._num_nodes = network_draw.positions.shape[0] self._num_edges = edges.shape[0] @abstractmethod def _command_string(self, steps, iters_by_step): """Return the python code which will compute the layout positions. Parameters ---------- steps : int number of steps; snapshots. For example, if you set steps=3 that means the positions will be updated three times. iters_by_step : int Returns -------- command_string : str a string with a code that starts the layout algorithm. """ ...
[docs] def update(self): """This method updates the position of the network actor and right after that refresh the network draw """ if self._record_positions: self._network_draw.positions = self._shm_manager.\ snapshots_positions.get_snapshot( self._current_step, self._num_nodes) self._current_step += 1 self._current_step = self._current_step % self._steps else: self._network_draw.positions = self._shm_manager.positions.data self._network_draw.refresh()
[docs] def start( self, ms=30, steps=100, iters_by_step=2, record_positions=False, without_iren_start=True): """This method starts the network layout algorithm creating a new subprocess. Right after the network layout algorithm finish the computation (ending of the related subprocess), the stop method will be called automatically. Parameters ----------- ms : float time interval in milleseconds to update the positions inside of the NetworkDraw steps : int number of steps; snapshots. For example, if you set steps=3 that means the positions will be updated three times. iters_by_step : int number of interations in each step record_positions : bool, optional, default True Record the positions of the network without_iren_start : bool, optional, default True Set this to False if you will start the ShowManager. That is, if you will invoke the following commands Examples -------- >>> network_draw.initialize() >>> network_draw.start() """ if self._started: return self._record_positions = record_positions if record_positions: self._shm_manager.add_array( 'snapshots_positions', np.zeros( ( self._shm_manager.positions._num_elements*steps, self._dimension ) ), self._dimension, 'float32' ) self._steps = steps self._current_step = 0 self._last_update = time.time() args = [ sys.executable, '-c', self._command_string(steps, iters_by_step) ] self._pserver = subprocess.Popen( args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False) if ms > 0: def callback_update_pos(caller, event): should_update = self._check_and_sync() if should_update: self.update() if not without_iren_start: self._id_observer = \ self._network_draw.iren.AddObserver( "TimerEvent", callback_update_pos) self._id_timer = \ self._network_draw.iren.CreateRepeatingTimer(ms) else: self._interval_timer = IntervalTimer( ms/1000, callback_update_pos, *[None, None]) self._started = True
def _check_and_sync(self): """ This will check two conditions: 1 - If the positions in the shared memory resources have changed. 2 - If the process responsible to compute the layout positions finished the computations Returns ------- should_update : bool """ last_update = self._shm_manager.info._repr[0] self._last_update = last_update # if stop has been called inside the treading timer # maybe another callback can be executed ok = True if self._pserver is None: ok = False elif self._record_positions: ok = self._current_step < self._shm_manager.info._repr[2] # if the process finished then stop the callback elif not is_running(self._pserver, 0): self.stop() ok = False else: ok = True return ok
[docs] def stop(self): """Stop the layout algorithm """ if not self._started: return if self._interval_timer is not None: self._interval_timer.stop() self._interval_timer = None if self._id_timer is not None: self._network_draw.iren.DestroyTimer(self._id_timer) self._id_timer = None if self._id_observer is not None: self._network_draw.iren.RemoveObserver(self._id_observer) self._id_observer = None if self._pserver is not None: self._pserver.kill() self._pserver.wait() self._pserver = None if self._record_positions: self._shm_manager.cleanup_mem('snapshots_positions') self._started = False
[docs] def cleanup(self): """Release the shared memory resources """ self._shm_manager.cleanup()
def __del__(self): self.stop() self.cleanup()