import threading from collections import OrderedDict, defaultdict from allure_commons.types import AttachmentType from allure_commons.model2 import ExecutableItem from allure_commons.model2 import TestResult from allure_commons.model2 import Attachment, ATTACHMENT_PATTERN from allure_commons.utils import now from allure_commons._core import plugin_manager class ThreadContextItems: _thread_context = defaultdict(OrderedDict) _init_thread: threading.Thread @property def thread_context(self): context = self._thread_context[threading.current_thread()] if not context and threading.current_thread() is not self._init_thread: uuid, last_item = next(reversed(self._thread_context[self._init_thread].items())) context[uuid] = last_item return context def __init__(self, *args, **kwargs): self._init_thread = threading.current_thread() super().__init__(*args, **kwargs) def __setitem__(self, key, value): self.thread_context.__setitem__(key, value) def __getitem__(self, item): return self.thread_context.__getitem__(item) def __iter__(self): return self.thread_context.__iter__() def __reversed__(self): return self.thread_context.__reversed__() def get(self, key): return self.thread_context.get(key) def pop(self, key): return self.thread_context.pop(key) def cleanup(self): stopped_threads = [] for thread in self._thread_context.keys(): if not thread.is_alive(): stopped_threads.append(thread) for thread in stopped_threads: del self._thread_context[thread] class AllureReporter: def __init__(self): self._items = ThreadContextItems() self._orphan_items = [] def _update_item(self, uuid, **kwargs): item = self._items[uuid] if uuid else self._items[next(reversed(self._items))] for name, value in kwargs.items(): attr = getattr(item, name) if isinstance(attr, list): attr.append(value) else: setattr(item, name, value) def _last_executable(self): for _uuid in reversed(self._items): if isinstance(self._items[_uuid], ExecutableItem): return _uuid def get_item(self, uuid): return self._items.get(uuid) def get_last_item(self, item_type=None): for _uuid in reversed(self._items): if item_type is None: return self._items.get(_uuid) if isinstance(self._items[_uuid], item_type): return self._items.get(_uuid) def start_group(self, uuid, group): self._items[uuid] = group def stop_group(self, uuid, **kwargs): self._update_item(uuid, **kwargs) group = self._items.pop(uuid) plugin_manager.hook.report_container(container=group) def update_group(self, uuid, **kwargs): self._update_item(uuid, **kwargs) def start_before_fixture(self, parent_uuid, uuid, fixture): self._items.get(parent_uuid).befores.append(fixture) self._items[uuid] = fixture def stop_before_fixture(self, uuid, **kwargs): self._update_item(uuid, **kwargs) self._items.pop(uuid) def start_after_fixture(self, parent_uuid, uuid, fixture): self._items.get(parent_uuid).afters.append(fixture) self._items[uuid] = fixture def stop_after_fixture(self, uuid, **kwargs): self._update_item(uuid, **kwargs) fixture = self._items.pop(uuid) fixture.stop = now() def schedule_test(self, uuid, test_case): self._items[uuid] = test_case def get_test(self, uuid): return self.get_item(uuid) if uuid else self.get_last_item(TestResult) def close_test(self, uuid): test_case = self._items.pop(uuid) self._items.cleanup() plugin_manager.hook.report_result(result=test_case) def drop_test(self, uuid): self._items.pop(uuid) def start_step(self, parent_uuid, uuid, step): parent_uuid = parent_uuid if parent_uuid else self._last_executable() if parent_uuid is None: self._orphan_items.append(uuid) else: self._items[parent_uuid].steps.append(step) self._items[uuid] = step def stop_step(self, uuid, **kwargs): if uuid in self._orphan_items: self._orphan_items.remove(uuid) else: self._update_item(uuid, **kwargs) self._items.pop(uuid) def _attach(self, uuid, name=None, attachment_type=None, extension=None, parent_uuid=None): mime_type = attachment_type extension = extension if extension else 'attach' if type(attachment_type) is AttachmentType: extension = attachment_type.extension mime_type = attachment_type.mime_type file_name = ATTACHMENT_PATTERN.format(prefix=uuid, ext=extension) attachment = Attachment(source=file_name, name=name, type=mime_type) last_uuid = parent_uuid if parent_uuid else self._last_executable() self._items[last_uuid].attachments.append(attachment) return file_name def attach_file(self, uuid, source, name=None, attachment_type=None, extension=None, parent_uuid=None): file_name = self._attach(uuid, name=name, attachment_type=attachment_type, extension=extension, parent_uuid=parent_uuid) plugin_manager.hook.report_attached_file(source=source, file_name=file_name) def attach_data(self, uuid, body, name=None, attachment_type=None, extension=None, parent_uuid=None): file_name = self._attach(uuid, name=name, attachment_type=attachment_type, extension=extension, parent_uuid=parent_uuid) plugin_manager.hook.report_attached_data(body=body, file_name=file_name)