Source code for helios.layouts.ipc_tools

"""Inter-Process communication tools

This Module provides abstract classes and objects to deal with inter-process
communication.

References
----------
    [1]“Python GSoC - Post #3: Network layout algorithms using IPC -
    demvessias’s Blog.”
    `blogs.python-gsoc.org/en/demvessiass-blog/post-3
    <https://blogs.python-gsoc.org/en/demvessiass-blog/post-3-network-layout-algorithms-using-ipc/>`_ 
    (accessed Jul. 24, 2021).

"""

import numpy as np
from abc import ABC, abstractmethod

import sys
if sys.version_info.minor >= 8:
    from multiprocessing import shared_memory
    PY_VERSION_8 = True
else:
    shared_memory = None
    PY_VERSION_8 = False


[docs]class GenericArrayBufferManager(ABC): """This implements a abstract (generic) ArrayBufferManager. The GenericArrayBufferManager is used for example to share the positions, edges and weights between different process. """ def __init__( self, dimension, dtype='float64', num_elements=None): """ Parameters ---------- dimension : int dtype : dtype num_elements : int, optional In MacOs a shared memory resource can be created with a different number of elements then the original data """ self._dtype = dtype self._dimension = dimension self._num_elements = num_elements @abstractmethod def load_mem_resource(self): pass @abstractmethod def create_mem_resource(self): pass @abstractmethod def cleanup(self): pass
[docs]class SharedMemArrayManager(GenericArrayBufferManager): """An implementation of a GenericArrayBufferManager using SharedMemory """ def __init__( self, dimension, dtype, data=None, buffer_name=None, num_elements=None): """ Parameters ---------- dimension : int number of columns dtype : str type of the ndarray data : ndarray bi-dimensional array buffer_name : str buffer_name, if you pass that, then this Obj. will try to load the memory resource num_elements : int, optional In MacOs a shared memory resource can be created with a different number of elements then the original data """ super().__init__(dimension, dtype, num_elements) self._released = False if buffer_name is None: self.create_mem_resource(data) else: self.load_mem_resource(buffer_name) def create_mem_resource(self, data): self._num_elements = data.shape[0] self._buffer = shared_memory.SharedMemory( create=True, size=data.nbytes) self._created = True self.create_repr() if self._repr.shape[0] >= data.shape[0]: self._repr[0:data.shape[0]] = data else: self._repr[:] = data def load_mem_resource(self, buffer_name): self._buffer = shared_memory.SharedMemory(buffer_name) self._created = False self.create_repr() def create_repr(self): s_bytes = np.array([1], dtype=self._dtype).nbytes self._num_elements_buffer = self._buffer.size//s_bytes//self._dimension if self._dimension == 1: shape = self._num_elements_buffer else: shape = (self._num_elements_buffer, self._dimension) self._repr = np.ndarray( shape, dtype=self._dtype, buffer=self._buffer.buf) self._buffer_name = self._buffer.name def cleanup(self): if self._released: return self._buffer.close() # this it's due the python core issues # https://bugs.python.org/issue38119 # https://bugs.python.org/issue39959 # https://github.com/luizalabs/shared-memory-dict/issues/13 if self._created: try: self._buffer.unlink() except FileNotFoundError: print(f'Shared Memory {self._buffer_name}\ File not found') self._released = True @property def data(self): return self._repr[0:self._num_elements] @data.setter def data(self, data): equal_dim = self._repr.shape[1] == data.shape[1] if self._repr.shape[0] > data.shape[0] and equal_dim: self._repr[0:data.shape[0]] = data.astype(self._dtype) elif self._repr.shape[0] == data.shape[0] and equal_dim: self._repr[:] = data.astype(self._dtype) def update_snapshot(self, data, step): size = data.shape[0] start = step*size end = step*size + size self._repr[start:end] = data.astype(self._dtype) def get_snapshot(self, step, size): start = step*size end = (step*size + size) return self._repr[start:end]
[docs]class ShmManagerMultiArrays: """This Obj. allows to deal with multiple arrays stored using SharedMemory """ def __init__(self): self._shm_attr_names = []
[docs] def add_array(self, attr_name, data, dimension, dtype): """This creates a shared memory resource to store the data. The shared memory obj will be accessible through >>> self.attr_name Parameters ---------- attr_name: str used to associate a new attribute 'attr_name' with the current (self) ShmManagerMultiArrays. data : ndarray dimension : int dtype : str """ if attr_name in self._shm_attr_names: raise ValueError(f'A Shared Memory array with the name {attr_name}\ is already in this ShmManager') _shm = SharedMemArrayManager( data=data.astype(dtype), dimension=dimension, dtype=dtype) self._shm_attr_names.append(attr_name) setattr(self, attr_name, _shm)
[docs] def load_array( self, attr_name, buffer_name, dimension, dtype, num_elements=None): """This will load the shared memory resource associate with buffer_name into the current ShmManagerMultiArrays The shared memory obj will be accessible through >>> self.attr_name Parameters ---------- attr_name : str this name will be used to associate a new attribute 'attr_name' with the current (self) ShmManagerMultiArrays. buffer_name : str dimension : int dtype : str num_elements : int, optional In MacOs a shared memory resource can be created with a different number of elements then the original data """ if attr_name in self._shm_attr_names: raise ValueError(f'A Shared Memory array with the name {attr_name}\ is already in this ShmManager') _shm = SharedMemArrayManager( buffer_name=buffer_name, dimension=dimension, dtype=dtype, num_elements=num_elements) self._shm_attr_names.append(attr_name) setattr(self, attr_name, _shm)
def cleanup_mem(self, resource_name): if resource_name in self._shm_attr_names: getattr(self, resource_name).cleanup() self._shm_attr_names.remove(resource_name) delattr(self, resource_name) else: raise ValueError(f'{resource_name} is not in this ShmManager') def cleanup(self): for _shm_name in self._shm_attr_names: getattr(self, _shm_name).cleanup()