import numpy as np
import multiprocessing
import time
import logging
from threading import Timer
from abc import ABC, abstractmethod
import asyncio
import sys
if sys.version_info.minor >= 8:
from multiprocessing import shared_memory
from multiprocessing import resource_tracker
PY_VERSION_8 = True
else:
shared_memory = None
PY_VERSION_8 = False
try:
import cv2
OPENCV_AVAILABLE = True
except ImportError:
from PIL import Image
import io
cv2 = None
OPENCV_AVAILABLE = False
def remove_shm_from_resource_tracker():
"""Monkey-patch multiprocessing.resource_tracker so SharedMemory won't
be tracked
More details at: https://bugs.python.org/issue38119
"""
def fix_register(name, rtype):
if rtype == "shared_memory":
return
return resource_tracker._resource_tracker.register(
self, name, rtype)
resource_tracker.register = fix_register
def fix_unregister(name, rtype):
if rtype == "shared_memory":
return
return resource_tracker._resource_tracker.unregister(
self, name, rtype)
resource_tracker.unregister = fix_unregister
if "shared_memory" in resource_tracker._CLEANUP_FUNCS:
del resource_tracker._CLEANUP_FUNCS["shared_memory"]
class GenericMultiDimensionalBuffer(ABC):
def __init__(
self, max_size=None, dimension=8):
"""This implements a abstract (generic) multidimensional buffer.
Parameters
----------
max_size : int, optional
If buffer_name or buffer was not passed then max_size
it's mandatory
dimension : int, default 8
"""
self.max_size = max_size
self.dimension = dimension
self.buffer_name = None
self._buffer = None
self._buffer_repr = None
self._created = False
@property
def buffer(self):
return self._buffer
@buffer.setter
def buffer(self, data):
if isinstance(data, (np.ndarray, np.generic)):
if data.dtype == 'float64':
self._buffer_repr[:] = data
def get_start_end(self, idx):
dim = self.dimension
start = idx*dim
end = dim*(idx+1)
return start, end
def __getitem__(self, idx):
start, end = self.get_start_end(idx)
logging.info(f'dequeue start {int(time.time()*1000)}')
ts = time.time()*1000
itens = self._buffer_repr[start:end]
te = time.time()*1000
logging.info(f'dequeue frombuffer cost {te-ts:.2f}')
return itens
def __setitem__(self, idx, data):
start, end = self.get_start_end(idx)
if isinstance(data, (np.ndarray, np.generic)):
if data.dtype == 'float64':
# if end - start == self.dimension and start >= 0 and end >= 0:
self._buffer_repr[start:end] = data
@abstractmethod
def load_mem_resource(self):
pass
@abstractmethod
def create_mem_resource(self):
pass
@abstractmethod
def cleanup(self):
pass
class RawArrayMultiDimensionalBuffer(GenericMultiDimensionalBuffer):
def __init__(self, max_size, dimension=4, buffer=None):
"""This implements a multidimensional buffer with RawArray.
Stream system uses that to implemenet the CircularQueue
with shared memory resources.
Parameters
----------
max_size : int, optional
If buffer_name or buffer was not passed then max_size
it's mandatory
dimension : int, default 8
buffer : buffer, optional
If buffer is not passed to __init__
then the multidimensional buffer obj will create a new
RawArray object to store the data
If buffer is passed than this Obj will read a
a already created RawArray
"""
super().__init__(max_size, dimension)
if buffer is None:
self.create_mem_resource()
else:
self._buffer = buffer
self.load_mem_resource()
def create_mem_resource(self):
buffer_arr = np.zeros(
self.dimension*(self.max_size+1), dtype='float64')
buffer = multiprocessing.RawArray(
'd', buffer_arr)
self._buffer = buffer
self._buffer_repr = np.ctypeslib.as_array(self._buffer)
def load_mem_resource(self):
self.max_size = int(len(self._buffer)//self.dimension)
self.max_size -= 1
self._buffer_repr = np.ctypeslib.as_array(self._buffer)
def cleanup(self):
pass
class SharedMemMultiDimensionalBuffer(GenericMultiDimensionalBuffer):
def __init__(self, max_size, dimension=4, buffer_name=None):
"""This implements a generic multidimensional buffer
with SharedMemory. Stream system uses that to implemenet the
CircularQueue with shared memory resources.
Parameters
----------
max_size : int, optional
If buffer_name or buffer was not passed then max_size
it's mandatory
dimension : int, default 8
buffer_name : str, optional
if buffer_name is passed than this Obj will read a
a already created SharedMemory
"""
super().__init__(max_size, dimension)
if buffer_name is None:
self.create_mem_resource()
self._created = True
else:
self.buffer_name = buffer_name
self.load_mem_resource()
self._created = False
def create_mem_resource(self):
buffer_arr = np.zeros(
self.dimension*(self.max_size+1), dtype='float64')
self._buffer = shared_memory.SharedMemory(
create=True, size=buffer_arr.nbytes)
# Some OSx versions has a minimum shared memory block size
# of 4096 bytes. Therefore, we must update this value
self.max_size = self._buffer.size//self.dimension//8-1
self._buffer_repr = np.ndarray(
self._buffer.size//8,
dtype='float64', buffer=self._buffer.buf)
self.buffer_name = self._buffer.name
logging.info([
'create repr multidimensional buffer ',
self._buffer_repr.shape, 'max size', self.max_size
])
def load_mem_resource(self):
self._buffer = shared_memory.SharedMemory(self.buffer_name)
# 8 represents 8 bytes of float64
self.max_size = self._buffer.size//self.dimension//8-1
# self.max_size = int(len(self._buffer.buf)//self.dimension/8)
# self.max_size -= 1
self._buffer_repr = np.ndarray(
self._buffer.size//8,
dtype='float64', buffer=self._buffer.buf)
logging.info([
'load repr multidimensional buffer',
self._buffer_repr.shape, 'max size', self.max_size
])
def cleanup(self):
self._buffer.close()
if self._created:
# this it's due the python core issues
# https://bugs.python.org/issue38119
# https://bugs.python.org/issue39959
# https://github.com/luizalabs/shared-memory-dict/issues/13
try:
self._buffer.unlink()
except FileNotFoundError:
print(
f'Shared Memory {self.buffer_name}(queue_event_buffer)\
File not found')
class GenericCircularQueue(ABC):
def __init__(
self, max_size=None, dimension=8,
use_shared_mem=False, buffer=None, buffer_name=None):
"""This implements a generic circular queue which works with
shared memory resources.
Parameters
----------
max_size : int, optional
If buffer_name or buffer was not passed then max_size
it's mandatory. This will be used to construct the
multidimensional buffer
dimension : int, default 8
This will be used to construct the multidimensional buffer
use_shared_mem : bool, default False
If the multidimensional memory resource should create or read
using SharedMemory or RawArrays
buffer : RawArray, optional
buffer_name: str, optional
"""
self._created = False
self.head_tail_buffer_name = None
self.head_tail_buffer_repr = None
self.head_tail_buffer = None
self._use_shared_mem = use_shared_mem
if use_shared_mem:
self.buffer = SharedMemMultiDimensionalBuffer(
max_size=max_size, dimension=dimension, buffer_name=buffer_name
)
else:
self.buffer = RawArrayMultiDimensionalBuffer(
max_size=max_size, dimension=dimension, buffer=buffer
)
@property
def head(self):
if self._use_shared_mem:
return self.head_tail_buffer_repr[0]
else:
return np.frombuffer(self.head_tail_buffer.get_obj(), 'int64')[0]
@head.setter
def head(self, value):
self.head_tail_buffer_repr[0] = value
@property
def tail(self):
if self._use_shared_mem:
return self.head_tail_buffer_repr[1]
else:
return np.frombuffer(self.head_tail_buffer.get_obj(), 'int64')[1]
@tail.setter
def tail(self, value):
self.head_tail_buffer_repr[1] = value
def set_head_tail(self, head, tail, lock=1):
self.head_tail_buffer_repr[0:3] = np.array(
[head, tail, lock]).astype('int64')
def _enqueue(self, data):
ok = False
if ((self.tail + 1) % self.buffer.max_size == self.head):
ok = False
else:
if (self.head == -1):
self.set_head_tail(0, 0, 1)
else:
self.tail = (self.tail + 1) % self.buffer.max_size
self.buffer[self.tail] = data
ok = True
return ok
def _dequeue(self):
if self.head == -1:
interactions = None
else:
if self.head != self.tail:
interactions = self.buffer[self.head]
self.head = (self.head + 1) % self.buffer.max_size
else:
interactions = self.buffer[self.head]
self.set_head_tail(-1, -1, 1)
return interactions
@abstractmethod
def enqueue(self):
pass
@abstractmethod
def dequeue(self):
pass
@abstractmethod
def load_mem_resource(self):
pass
@abstractmethod
def create_mem_resource(self):
pass
@abstractmethod
def cleanup(self):
pass
class ArrayCircularQueue(GenericCircularQueue):
def __init__(
self, max_size=10, dimension=6,
head_tail_buffer=None, buffer=None):
"""This implements a MultiDimensional Queue which works with
Arrays and RawArrays. Stream system uses that to implement
user interactions
Parameters
----------
max_size : int, optional
If buffer_name or buffer was not passed then max_size
it's mandatory. This will be used to construct the
multidimensional buffer
dimension : int, default 8
This will be used to construct the multidimensional buffer
head_tail_buffer : buffer, optional
If buffer is not passed to __init__
then this obj will create a new
RawArray to store head and tail position.
buffer : buffer, optional
If buffer is not passed to __init__
then the multidimensional buffer obj will create a new
RawArray to store the data
"""
super().__init__(
max_size, dimension, use_shared_mem=False, buffer=buffer)
if head_tail_buffer is None:
self.create_mem_resource()
self._created = True
else:
self.head_tail_buffer = head_tail_buffer
self._created = False
self.head_tail_buffer_name = None
self.head_tail_buffer_repr = self.head_tail_buffer
if self._created:
self.set_head_tail(-1, -1, 0)
def load_mem_resource(self):
pass
def create_mem_resource(self):
# head_tail_arr[0] int; head position
# head_tail_arr[1] int; tail position
head_tail_arr = np.array([-1, -1, 0], dtype='int64')
self.head_tail_buffer = multiprocessing.Array(
'l', head_tail_arr,
)
def enqueue(self, data):
ok = False
with self.head_tail_buffer.get_lock():
ok = self._enqueue(data)
return ok
def dequeue(self):
with self.head_tail_buffer.get_lock():
interactions = self._dequeue()
return interactions
def cleanup(self):
pass
class SharedMemCircularQueue(GenericCircularQueue):
def __init__(
self, max_size=10, dimension=6,
head_tail_buffer_name=None, buffer_name=None):
"""This implements a MultiDimensional Queue which works with
SharedMemory.
Stream system uses that to implemenet user interactions
Parameters
----------
max_size : int, optional
If buffer_name or buffer was not passed then max_size
it's mandatory. This will be used to construct the
multidimensional buffer
dimension : int, default 8
This will be used to construct the multidimensional buffer
head_tail_buffer_name : str, optional
if buffer_name is passed than this Obj will read a
a already created SharedMemory with the head and tail
informations
buffer_name : str, optional
if buffer_name is passed than this Obj will read a
a already created SharedMemory to create the MultiDimensionalBuffer
"""
super().__init__(
max_size, dimension, use_shared_mem=True, buffer_name=buffer_name)
if head_tail_buffer_name is None:
self.create_mem_resource()
self._created = True
else:
self.head_tail_buffer_name = head_tail_buffer_name
self.load_mem_resource()
self._created = False
self.head_tail_buffer_repr = np.ndarray(
self.head_tail_buffer.size//8,
dtype='int64', buffer=self.head_tail_buffer.buf)
logging.info([
'create shared mem',
'size repr', self.head_tail_buffer_repr.shape,
'size buffer', self.head_tail_buffer.size//8
])
if self._created:
self.set_head_tail(-1, -1, 0)
def load_mem_resource(self):
self.head_tail_buffer = shared_memory.SharedMemory(
self.head_tail_buffer_name)
def create_mem_resource(self):
# head_tail_arr[0] int; head position
# head_tail_arr[1] int; tail position
head_tail_arr = np.array([-1, -1, 0], dtype='int64')
self.head_tail_buffer = shared_memory.SharedMemory(
create=True, size=head_tail_arr.nbytes)
self.head_tail_buffer_name = self.head_tail_buffer.name
def is_unlocked(self):
return self.head_tail_buffer_repr[2] == 0
def lock(self):
self.head_tail_buffer_repr[2] = 1
def unlock(self):
self.head_tail_buffer_repr[2] = 0
def enqueue(self, data):
ok = False
if self.is_unlocked():
self.lock()
ok = self._enqueue(data)
self.unlock()
return ok
def dequeue(self):
interactions = None
if self.is_unlocked():
self.lock()
interactions = self._dequeue()
self.unlock()
return interactions
def cleanup(self):
self.buffer.cleanup()
self.head_tail_buffer.close()
if self._created:
# this it's due the python core issues
# https://bugs.python.org/issue38119
# https://bugs.python.org/issue39959
# https://github.com/luizalabs/shared-memory-dict/issues/13
try:
self.head_tail_buffer.unlink()
except FileNotFoundError:
print(
f'Shared Memory {self.head_tail_buffer_name}(head_tail)\
File not found')
class GenericImageBufferManager(ABC):
def __init__(
self, max_window_size=None, num_buffers=2, use_shared_mem=False):
"""This implements a abstract (generic) ImageBufferManager with
the n-buffer techinique.
Parameters
----------
max_window_size : tuple of ints, optional
This allows resize events inside of the FURY window instance.
Should be greater than the window size.
num_buffers : int, optional
Number of buffers to be used in the n-buffering
techinique.
use_shared_mem: bool, default False
"""
self.max_window_size = max_window_size
self.num_buffers = num_buffers
self.info_buffer_size = num_buffers*2 + 2
self._use_shared_mem = use_shared_mem
self.max_size = None # int
self.num_components = 3
self.image_reprs = []
self.image_buffers = []
self.image_buffer_names = []
self.info_buffer_name = None
self.info_buffer = None
self.info_buffer_repr = None
self._created = False
@property
def next_buffer_index(self):
index = int((self.info_buffer_repr[1]+1) % self.num_buffers)
return index
@property
def buffer_index(self):
index = int(self.info_buffer_repr[1])
return index
def write_into(self, w, h, np_arr):
buffer_size = buffer_size = int(h*w)
next_buffer_index = self.next_buffer_index
if buffer_size == self.max_size:
self.image_reprs[
next_buffer_index][:] = np_arr
elif buffer_size < self.max_size:
self.image_reprs[
next_buffer_index][0:buffer_size*3] = np_arr
else:
rand_img = np.random.randint(
0, 255, size=self.max_size*3,
dtype='uint8')
self.image_reprs[
# next_buffer_index][0:self.max_size*3] = rand_img
next_buffer_index][:] = rand_img
w = self.max_window_size[0]
h = self.max_window_size[1]
self.info_buffer_repr[2+next_buffer_index*2] = w
self.info_buffer_repr[2+next_buffer_index*2+1] = h
self.info_buffer_repr[1] = next_buffer_index
def get_current_frame(self, from_buffer=False):
if not self._use_shared_mem:
image_info = np.frombuffer(
self.info_buffer, 'uint32')
else:
image_info = self.info_buffer_repr
buffer_index = int(image_info[1])
self.width = int(image_info[2+buffer_index*2])
self.height = int(image_info[2+buffer_index*2+1])
image = self.image_reprs[buffer_index]
self.image_buffer_repr = image
return self.width, self.height, image
def get_jpeg(self):
width, height, image = self.get_current_frame()
if self._use_shared_mem:
image = np.frombuffer(
image, 'uint8')
image = image[0:width*height*3].reshape(
(height, width, 3))
image = np.flipud(image)
# preserve width and height, flip B->R
image = image[:, :, ::-1]
if OPENCV_AVAILABLE:
image_encoded = cv2.imencode('.jpg', image)[1]
bytes_img = image_encoded.tobytes()
else:
image_encoded = Image.fromarray(image)
bytes_img_data = io.BytesIO()
image_encoded.save(bytes_img_data, format='jpeg')
bytes_img = bytes_img_data.getvalue()
return bytes_img
async def async_get_jpeg(self, ms=33):
jpeg = self.get_jpeg()
await asyncio.sleep(ms/1000)
return jpeg
@abstractmethod
def load_mem_resource(self):
pass
@abstractmethod
def create_mem_resource(self):
pass
@abstractmethod
def cleanup(self):
pass
class RawArrayImageBufferManager(GenericImageBufferManager):
def __init__(
self, max_window_size=(100, 100), num_buffers=2,
image_buffers=None, info_buffer=None):
"""This implements an ImageBufferManager using RawArrays.
Parameters
----------
max_window_size : tuple of ints, optional
This allows resize events inside of the FURY window instance.
Should be greater than the window size.
num_buffers : int, optional
Number of buffers to be used in the n-buffering
techinique.
info_buffer : buffer, optional
A buffer with the information about the current
frame to be streamed and the respective sizes
image_buffers : list of buffers, optional
A list of buffers with each one containing a frame.
"""
super().__init__(max_window_size, num_buffers, use_shared_mem=False)
if image_buffers is None or info_buffer is None:
self.create_mem_resource()
else:
self.image_buffers = image_buffers
self.info_buffer = info_buffer
self.load_mem_resource()
def create_mem_resource(self):
self.max_size = self.max_window_size[0]*self.max_window_size[1]
self.max_size *= self.num_components
for _ in range(self.num_buffers):
buffer = multiprocessing.RawArray(
'B', np.random.randint(
0, 255,
size=self.max_size)
.astype('uint8'))
self.image_buffers.append(buffer)
self.image_reprs.append(
np.ctypeslib.as_array(buffer))
# info_list stores the information about the n frame buffers
# as well the respectives sizes.
# 0 number of components
# 1 id buffer
# 2, 3, width first buffer, height first buffer
# 4, 5, width second buffer , height second buffer
info_list = [3, 0]
for _ in range(self.num_buffers):
info_list += [self.max_window_size[0]]
info_list += [self.max_window_size[1]]
info_list = np.array(
info_list, dtype='uint64'
)
self.info_buffer = multiprocessing.RawArray(
'I', np.array(info_list).astype('uint64')
)
self.info_buffer_repr = np.ctypeslib.as_array(self.info_buffer)
def load_mem_resource(self):
self.info_buffer = np.frombuffer(
self.info_buffer, 'uint64')
self.info_buffer_repr = np.ctypeslib.as_array(
self.info_buffer)
for img_buffer in self.image_buffers:
self.image_reprs.append(np.ctypeslib.as_array(img_buffer))
def cleanup(self):
pass
class SharedMemImageBufferManager(GenericImageBufferManager):
def __init__(
self, max_window_size=(100, 100), num_buffers=2,
image_buffer_names=None, info_buffer_name=None):
"""This implements an ImageBufferManager using the
SharedMemory approach. Python >=3.8 is a requirement
to use this object.
Parameters
----------
max_window_size : tuple of ints, optional
This allows resize events inside of the FURY window instance.
Should be greater than the window size.
num_buffers : int, optional
Number of buffers to be used in the n-buffering
techinique.
info_buffer_name : str
The name of a buffer with the information about the current
frame to be streamed and the respective sizes
image_buffer_names : list of str, optional
a list of buffer names. Each buffer contains a frame
"""
super().__init__(max_window_size, num_buffers, use_shared_mem=True)
if image_buffer_names is None or info_buffer_name is None:
self.create_mem_resource()
self._created = True
else:
self.image_buffer_names = image_buffer_names
self.info_buffer_name = info_buffer_name
self._created = False
self.load_mem_resource()
def create_mem_resource(self):
self.max_size = self.max_window_size[0]*self.max_window_size[1]
self.max_size *= self.num_components
for _ in range(self.num_buffers):
buffer = shared_memory.SharedMemory(
create=True, size=self.max_size)
self.image_buffers.append(buffer)
self.image_reprs.append(
np.ndarray(
self.max_size, dtype=np.uint8, buffer=buffer.buf))
self.image_buffer_names.append(buffer.name)
info_list = [3, 0]
for _ in range(self.num_buffers):
info_list += [self.max_window_size[0]]
info_list += [self.max_window_size[1]]
info_list = np.array(
info_list, dtype='uint64'
)
self.info_buffer = shared_memory.SharedMemory(
create=True, size=info_list.nbytes)
self.info_buffer_repr = np.ndarray(
self.info_buffer.size//8,
dtype='uint64',
buffer=self.info_buffer.buf)
logging.info([
'info buffer create',
'buffer size', self.info_buffer.size,
'repr size', self.info_buffer_repr.shape
])
self.info_buffer_name = self.info_buffer.name
def load_mem_resource(self):
self.info_buffer = shared_memory.SharedMemory(self.info_buffer_name)
self.info_buffer_repr = np.ndarray(
self.info_buffer.size//8,
dtype='uint64',
buffer=self.info_buffer.buf)
logging.info([
'info buffer load',
'buffer size', self.info_buffer.size,
'repr size', self.info_buffer_repr.shape
])
for buffer_name in self.image_buffer_names:
buffer = shared_memory.SharedMemory(buffer_name)
self.image_buffers.append(buffer)
self.image_reprs.append(np.ndarray(
buffer.size,
dtype=np.uint8,
buffer=buffer.buf))
def cleanup(self):
self.info_buffer.close()
# this it's due the python core issues
# https://bugs.python.org/issue38119
# https://bugs.python.org/issue39959
# https://github.com/luizalabs/shared-memory-dict/issues/13
if self._created:
try:
self.info_buffer.unlink()
except FileNotFoundError:
print(f'Shared Memory {self.info_buffer_name}\
(info_buffer) File not found')
for buffer, name in zip(
self.image_buffers, self.image_buffer_names):
buffer.close()
if self._created:
try:
buffer.unlink()
except FileNotFoundError:
print(f'Shared Memory {name}(buffer image) File not found')
[docs]class IntervalTimer:
[docs] def __init__(self, seconds, callback, *args, **kwargs):
"""Implements a object with the same behavior of setInterval from Js
Parameters
----------
seconds : float
A postive float number. Represents the total amount of
seconds between each call
callback : function
The function to be called
*args : args
args to be passed to callback
**kwargs : kwargs
kwargs to be passed to callback
Examples
-------
>>> def callback(arr):
>>> arr += [len(arr)]
>>> arr = []
>>> interval_timer = tools.IntervalTimer(1, callback, arr)
>>> interval_timer.start()
>>> time.sleep(5)
>>> interval_timer.stop()
>>> # len(arr) == 5
Refs
----
I got this from
https://stackoverflow.com/questions/3393612/run-certain-code-every-n-seconds
"""
self._timer = None
self.seconds = seconds
self.callback = callback
self.args = args
self.kwargs = kwargs
self.is_running = False
# self.next_call = time.time()
self.start()
def _run(self):
self.is_running = False
self.start()
self.callback(*self.args, **self.kwargs)
[docs] def start(self):
if not self.is_running:
self._timer = Timer(self.seconds, self._run)
# self.next_call += selfseconds.
# self._timer = Timer(self.next_call - time.time(), self._run)
self._timer.daemon = True
self._timer.start()
self.is_running = True
[docs] def stop(self):
if self._timer is not None:
self._timer.cancel()
self.is_running = False
self._timer = None
# self._timer.join()