123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534 |
- """Provides shared memory for direct access across processes.
- The API of this package is currently provisional. Refer to the
- documentation for details.
- """
- __all__ = [ 'SharedMemory', 'ShareableList' ]
- from functools import partial
- import mmap
- import os
- import errno
- import struct
- import secrets
- import types
- if os.name == "nt":
- import _winapi
- _USE_POSIX = False
- else:
- import _posixshmem
- _USE_POSIX = True
- from . import resource_tracker
- _O_CREX = os.O_CREAT | os.O_EXCL
- # FreeBSD (and perhaps other BSDs) limit names to 14 characters.
- _SHM_SAFE_NAME_LENGTH = 14
- # Shared memory block name prefix
- if _USE_POSIX:
- _SHM_NAME_PREFIX = '/psm_'
- else:
- _SHM_NAME_PREFIX = 'wnsm_'
- def _make_filename():
- "Create a random filename for the shared memory object."
- # number of random bytes to use for name
- nbytes = (_SHM_SAFE_NAME_LENGTH - len(_SHM_NAME_PREFIX)) // 2
- assert nbytes >= 2, '_SHM_NAME_PREFIX too long'
- name = _SHM_NAME_PREFIX + secrets.token_hex(nbytes)
- assert len(name) <= _SHM_SAFE_NAME_LENGTH
- return name
- class SharedMemory:
- """Creates a new shared memory block or attaches to an existing
- shared memory block.
- Every shared memory block is assigned a unique name. This enables
- one process to create a shared memory block with a particular name
- so that a different process can attach to that same shared memory
- block using that same name.
- As a resource for sharing data across processes, shared memory blocks
- may outlive the original process that created them. When one process
- no longer needs access to a shared memory block that might still be
- needed by other processes, the close() method should be called.
- When a shared memory block is no longer needed by any process, the
- unlink() method should be called to ensure proper cleanup."""
- # Defaults; enables close() and unlink() to run without errors.
- _name = None
- _fd = -1
- _mmap = None
- _buf = None
- _flags = os.O_RDWR
- _mode = 0o600
- _prepend_leading_slash = True if _USE_POSIX else False
- def __init__(self, name=None, create=False, size=0):
- if not size >= 0:
- raise ValueError("'size' must be a positive integer")
- if create:
- self._flags = _O_CREX | os.O_RDWR
- if size == 0:
- raise ValueError("'size' must be a positive number different from zero")
- if name is None and not self._flags & os.O_EXCL:
- raise ValueError("'name' can only be None if create=True")
- if _USE_POSIX:
- # POSIX Shared Memory
- if name is None:
- while True:
- name = _make_filename()
- try:
- self._fd = _posixshmem.shm_open(
- name,
- self._flags,
- mode=self._mode
- )
- except FileExistsError:
- continue
- self._name = name
- break
- else:
- name = "/" + name if self._prepend_leading_slash else name
- self._fd = _posixshmem.shm_open(
- name,
- self._flags,
- mode=self._mode
- )
- self._name = name
- try:
- if create and size:
- os.ftruncate(self._fd, size)
- stats = os.fstat(self._fd)
- size = stats.st_size
- self._mmap = mmap.mmap(self._fd, size)
- except OSError:
- self.unlink()
- raise
- resource_tracker.register(self._name, "shared_memory")
- else:
- # Windows Named Shared Memory
- if create:
- while True:
- temp_name = _make_filename() if name is None else name
- # Create and reserve shared memory block with this name
- # until it can be attached to by mmap.
- h_map = _winapi.CreateFileMapping(
- _winapi.INVALID_HANDLE_VALUE,
- _winapi.NULL,
- _winapi.PAGE_READWRITE,
- (size >> 32) & 0xFFFFFFFF,
- size & 0xFFFFFFFF,
- temp_name
- )
- try:
- last_error_code = _winapi.GetLastError()
- if last_error_code == _winapi.ERROR_ALREADY_EXISTS:
- if name is not None:
- raise FileExistsError(
- errno.EEXIST,
- os.strerror(errno.EEXIST),
- name,
- _winapi.ERROR_ALREADY_EXISTS
- )
- else:
- continue
- self._mmap = mmap.mmap(-1, size, tagname=temp_name)
- finally:
- _winapi.CloseHandle(h_map)
- self._name = temp_name
- break
- else:
- self._name = name
- # Dynamically determine the existing named shared memory
- # block's size which is likely a multiple of mmap.PAGESIZE.
- h_map = _winapi.OpenFileMapping(
- _winapi.FILE_MAP_READ,
- False,
- name
- )
- try:
- p_buf = _winapi.MapViewOfFile(
- h_map,
- _winapi.FILE_MAP_READ,
- 0,
- 0,
- 0
- )
- finally:
- _winapi.CloseHandle(h_map)
- try:
- size = _winapi.VirtualQuerySize(p_buf)
- finally:
- _winapi.UnmapViewOfFile(p_buf)
- self._mmap = mmap.mmap(-1, size, tagname=name)
- self._size = size
- self._buf = memoryview(self._mmap)
- def __del__(self):
- try:
- self.close()
- except OSError:
- pass
- def __reduce__(self):
- return (
- self.__class__,
- (
- self.name,
- False,
- self.size,
- ),
- )
- def __repr__(self):
- return f'{self.__class__.__name__}({self.name!r}, size={self.size})'
- @property
- def buf(self):
- "A memoryview of contents of the shared memory block."
- return self._buf
- @property
- def name(self):
- "Unique name that identifies the shared memory block."
- reported_name = self._name
- if _USE_POSIX and self._prepend_leading_slash:
- if self._name.startswith("/"):
- reported_name = self._name[1:]
- return reported_name
- @property
- def size(self):
- "Size in bytes."
- return self._size
- def close(self):
- """Closes access to the shared memory from this instance but does
- not destroy the shared memory block."""
- if self._buf is not None:
- self._buf.release()
- self._buf = None
- if self._mmap is not None:
- self._mmap.close()
- self._mmap = None
- if _USE_POSIX and self._fd >= 0:
- os.close(self._fd)
- self._fd = -1
- def unlink(self):
- """Requests that the underlying shared memory block be destroyed.
- In order to ensure proper cleanup of resources, unlink should be
- called once (and only once) across all processes which have access
- to the shared memory block."""
- if _USE_POSIX and self._name:
- _posixshmem.shm_unlink(self._name)
- resource_tracker.unregister(self._name, "shared_memory")
- _encoding = "utf8"
- class ShareableList:
- """Pattern for a mutable list-like object shareable via a shared
- memory block. It differs from the built-in list type in that these
- lists can not change their overall length (i.e. no append, insert,
- etc.)
- Because values are packed into a memoryview as bytes, the struct
- packing format for any storable value must require no more than 8
- characters to describe its format."""
- # The shared memory area is organized as follows:
- # - 8 bytes: number of items (N) as a 64-bit integer
- # - (N + 1) * 8 bytes: offsets of each element from the start of the
- # data area
- # - K bytes: the data area storing item values (with encoding and size
- # depending on their respective types)
- # - N * 8 bytes: `struct` format string for each element
- # - N bytes: index into _back_transforms_mapping for each element
- # (for reconstructing the corresponding Python value)
- _types_mapping = {
- int: "q",
- float: "d",
- bool: "xxxxxxx?",
- str: "%ds",
- bytes: "%ds",
- None.__class__: "xxxxxx?x",
- }
- _alignment = 8
- _back_transforms_mapping = {
- 0: lambda value: value, # int, float, bool
- 1: lambda value: value.rstrip(b'\x00').decode(_encoding), # str
- 2: lambda value: value.rstrip(b'\x00'), # bytes
- 3: lambda _value: None, # None
- }
- @staticmethod
- def _extract_recreation_code(value):
- """Used in concert with _back_transforms_mapping to convert values
- into the appropriate Python objects when retrieving them from
- the list as well as when storing them."""
- if not isinstance(value, (str, bytes, None.__class__)):
- return 0
- elif isinstance(value, str):
- return 1
- elif isinstance(value, bytes):
- return 2
- else:
- return 3 # NoneType
- def __init__(self, sequence=None, *, name=None):
- if name is None or sequence is not None:
- sequence = sequence or ()
- _formats = [
- self._types_mapping[type(item)]
- if not isinstance(item, (str, bytes))
- else self._types_mapping[type(item)] % (
- self._alignment * (len(item) // self._alignment + 1),
- )
- for item in sequence
- ]
- self._list_len = len(_formats)
- assert sum(len(fmt) <= 8 for fmt in _formats) == self._list_len
- offset = 0
- # The offsets of each list element into the shared memory's
- # data area (0 meaning the start of the data area, not the start
- # of the shared memory area).
- self._allocated_offsets = [0]
- for fmt in _formats:
- offset += self._alignment if fmt[-1] != "s" else int(fmt[:-1])
- self._allocated_offsets.append(offset)
- _recreation_codes = [
- self._extract_recreation_code(item) for item in sequence
- ]
- requested_size = struct.calcsize(
- "q" + self._format_size_metainfo +
- "".join(_formats) +
- self._format_packing_metainfo +
- self._format_back_transform_codes
- )
- self.shm = SharedMemory(name, create=True, size=requested_size)
- else:
- self.shm = SharedMemory(name)
- if sequence is not None:
- _enc = _encoding
- struct.pack_into(
- "q" + self._format_size_metainfo,
- self.shm.buf,
- 0,
- self._list_len,
- *(self._allocated_offsets)
- )
- struct.pack_into(
- "".join(_formats),
- self.shm.buf,
- self._offset_data_start,
- *(v.encode(_enc) if isinstance(v, str) else v for v in sequence)
- )
- struct.pack_into(
- self._format_packing_metainfo,
- self.shm.buf,
- self._offset_packing_formats,
- *(v.encode(_enc) for v in _formats)
- )
- struct.pack_into(
- self._format_back_transform_codes,
- self.shm.buf,
- self._offset_back_transform_codes,
- *(_recreation_codes)
- )
- else:
- self._list_len = len(self) # Obtains size from offset 0 in buffer.
- self._allocated_offsets = list(
- struct.unpack_from(
- self._format_size_metainfo,
- self.shm.buf,
- 1 * 8
- )
- )
- def _get_packing_format(self, position):
- "Gets the packing format for a single value stored in the list."
- position = position if position >= 0 else position + self._list_len
- if (position >= self._list_len) or (self._list_len < 0):
- raise IndexError("Requested position out of range.")
- v = struct.unpack_from(
- "8s",
- self.shm.buf,
- self._offset_packing_formats + position * 8
- )[0]
- fmt = v.rstrip(b'\x00')
- fmt_as_str = fmt.decode(_encoding)
- return fmt_as_str
- def _get_back_transform(self, position):
- "Gets the back transformation function for a single value."
- if (position >= self._list_len) or (self._list_len < 0):
- raise IndexError("Requested position out of range.")
- transform_code = struct.unpack_from(
- "b",
- self.shm.buf,
- self._offset_back_transform_codes + position
- )[0]
- transform_function = self._back_transforms_mapping[transform_code]
- return transform_function
- def _set_packing_format_and_transform(self, position, fmt_as_str, value):
- """Sets the packing format and back transformation code for a
- single value in the list at the specified position."""
- if (position >= self._list_len) or (self._list_len < 0):
- raise IndexError("Requested position out of range.")
- struct.pack_into(
- "8s",
- self.shm.buf,
- self._offset_packing_formats + position * 8,
- fmt_as_str.encode(_encoding)
- )
- transform_code = self._extract_recreation_code(value)
- struct.pack_into(
- "b",
- self.shm.buf,
- self._offset_back_transform_codes + position,
- transform_code
- )
- def __getitem__(self, position):
- position = position if position >= 0 else position + self._list_len
- try:
- offset = self._offset_data_start + self._allocated_offsets[position]
- (v,) = struct.unpack_from(
- self._get_packing_format(position),
- self.shm.buf,
- offset
- )
- except IndexError:
- raise IndexError("index out of range")
- back_transform = self._get_back_transform(position)
- v = back_transform(v)
- return v
- def __setitem__(self, position, value):
- position = position if position >= 0 else position + self._list_len
- try:
- item_offset = self._allocated_offsets[position]
- offset = self._offset_data_start + item_offset
- current_format = self._get_packing_format(position)
- except IndexError:
- raise IndexError("assignment index out of range")
- if not isinstance(value, (str, bytes)):
- new_format = self._types_mapping[type(value)]
- encoded_value = value
- else:
- allocated_length = self._allocated_offsets[position + 1] - item_offset
- encoded_value = (value.encode(_encoding)
- if isinstance(value, str) else value)
- if len(encoded_value) > allocated_length:
- raise ValueError("bytes/str item exceeds available storage")
- if current_format[-1] == "s":
- new_format = current_format
- else:
- new_format = self._types_mapping[str] % (
- allocated_length,
- )
- self._set_packing_format_and_transform(
- position,
- new_format,
- value
- )
- struct.pack_into(new_format, self.shm.buf, offset, encoded_value)
- def __reduce__(self):
- return partial(self.__class__, name=self.shm.name), ()
- def __len__(self):
- return struct.unpack_from("q", self.shm.buf, 0)[0]
- def __repr__(self):
- return f'{self.__class__.__name__}({list(self)}, name={self.shm.name!r})'
- @property
- def format(self):
- "The struct packing format used by all currently stored items."
- return "".join(
- self._get_packing_format(i) for i in range(self._list_len)
- )
- @property
- def _format_size_metainfo(self):
- "The struct packing format used for the items' storage offsets."
- return "q" * (self._list_len + 1)
- @property
- def _format_packing_metainfo(self):
- "The struct packing format used for the items' packing formats."
- return "8s" * self._list_len
- @property
- def _format_back_transform_codes(self):
- "The struct packing format used for the items' back transforms."
- return "b" * self._list_len
- @property
- def _offset_data_start(self):
- # - 8 bytes for the list length
- # - (N + 1) * 8 bytes for the element offsets
- return (self._list_len + 2) * 8
- @property
- def _offset_packing_formats(self):
- return self._offset_data_start + self._allocated_offsets[-1]
- @property
- def _offset_back_transform_codes(self):
- return self._offset_packing_formats + self._list_len * 8
- def count(self, value):
- "L.count(value) -> integer -- return number of occurrences of value."
- return sum(value == entry for entry in self)
- def index(self, value):
- """L.index(value) -> integer -- return first index of value.
- Raises ValueError if the value is not present."""
- for position, entry in enumerate(self):
- if value == entry:
- return position
- else:
- raise ValueError(f"{value!r} not in this container")
- __class_getitem__ = classmethod(types.GenericAlias)
|