|
@@ -173,7 +173,7 @@ public:
|
|
|
tail_counter.fetch_add(queue_rep_type::n_queue);
|
|
|
}
|
|
|
|
|
|
- bool pop( void* dst, ticket_type k, queue_rep_type& base, queue_allocator_type& allocator) {
|
|
|
+ bool pop( void* dst, ticket_type k, queue_rep_type& base, queue_allocator_type& allocator ) {
|
|
|
k &= -queue_rep_type::n_queue;
|
|
|
spin_wait_until_eq(head_counter, k);
|
|
|
d1::call_itt_notify(d1::acquired, &head_counter);
|
|
@@ -189,7 +189,7 @@ public:
|
|
|
k + queue_rep_type::n_queue, index == items_per_page - 1 ? p : nullptr );
|
|
|
if (p->mask.load(std::memory_order_relaxed) & (std::uintptr_t(1) << index)) {
|
|
|
success = true;
|
|
|
- assign_and_destroy_item( dst, *p, index );
|
|
|
+ assign_and_destroy_item(dst, *p, index);
|
|
|
} else {
|
|
|
--base.n_invalid_entries;
|
|
|
}
|
|
@@ -276,36 +276,38 @@ public:
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- padded_page* get_tail_page() {
|
|
|
- return tail_page.load(std::memory_order_relaxed);
|
|
|
- }
|
|
|
-
|
|
|
padded_page* get_head_page() {
|
|
|
return head_page.load(std::memory_order_relaxed);
|
|
|
}
|
|
|
|
|
|
- void set_tail_page( padded_page* pg ) {
|
|
|
- tail_page.store(pg, std::memory_order_relaxed);
|
|
|
- }
|
|
|
-
|
|
|
- void clear(queue_allocator_type& allocator ) {
|
|
|
- padded_page* curr_page = head_page.load(std::memory_order_relaxed);
|
|
|
- std::size_t index = head_counter.load(std::memory_order_relaxed);
|
|
|
+ void clear(queue_allocator_type& allocator, padded_page* new_head = nullptr, padded_page* new_tail = nullptr) {
|
|
|
+ padded_page* curr_page = get_head_page();
|
|
|
+ size_type index = (head_counter.load(std::memory_order_relaxed) / queue_rep_type::n_queue) % items_per_page;
|
|
|
page_allocator_type page_allocator(allocator);
|
|
|
|
|
|
- while (curr_page) {
|
|
|
- for (; index != items_per_page - 1; ++index) {
|
|
|
- curr_page->operator[](index).~value_type();
|
|
|
+ while (curr_page && is_valid_page(curr_page)) {
|
|
|
+ while (index != items_per_page) {
|
|
|
+ if (curr_page->mask.load(std::memory_order_relaxed) & (std::uintptr_t(1) << index)) {
|
|
|
+ page_allocator_traits::destroy(page_allocator, &curr_page->operator[](index));
|
|
|
+ }
|
|
|
+ ++index;
|
|
|
}
|
|
|
- padded_page* next_page = curr_page->next;
|
|
|
- page_allocator_traits::destroy(page_allocator, curr_page);
|
|
|
- page_allocator_traits::deallocate(page_allocator, curr_page, 1);
|
|
|
- curr_page = next_page;
|
|
|
+
|
|
|
+ index = 0;
|
|
|
+ padded_page* next_page = curr_page->next;
|
|
|
+ page_allocator_traits::destroy(page_allocator, curr_page);
|
|
|
+ page_allocator_traits::deallocate(page_allocator, curr_page, 1);
|
|
|
+ curr_page = next_page;
|
|
|
}
|
|
|
+ head_counter.store(0, std::memory_order_relaxed);
|
|
|
+ tail_counter.store(0, std::memory_order_relaxed);
|
|
|
+ head_page.store(new_head, std::memory_order_relaxed);
|
|
|
+ tail_page.store(new_tail, std::memory_order_relaxed);
|
|
|
+ }
|
|
|
|
|
|
+ void clear_and_invalidate(queue_allocator_type& allocator) {
|
|
|
padded_page* invalid_page = reinterpret_cast<padded_page*>(std::uintptr_t(1));
|
|
|
- head_page.store(invalid_page, std::memory_order_relaxed);
|
|
|
- tail_page.store(invalid_page, std::memory_order_relaxed);
|
|
|
+ clear(allocator, invalid_page, invalid_page);
|
|
|
}
|
|
|
|
|
|
private:
|
|
@@ -430,18 +432,12 @@ public:
|
|
|
concurrent_queue_rep& operator=( const concurrent_queue_rep& ) = delete;
|
|
|
|
|
|
void clear( queue_allocator_type& alloc ) {
|
|
|
- page_allocator_type page_allocator(alloc);
|
|
|
- for (size_type i = 0; i < n_queue; ++i) {
|
|
|
- padded_page* tail_page = array[i].get_tail_page();
|
|
|
- if( is_valid_page(tail_page) ) {
|
|
|
- __TBB_ASSERT(array[i].get_head_page() == tail_page, "at most one page should remain" );
|
|
|
- page_allocator_traits::destroy(page_allocator, static_cast<padded_page*>(tail_page));
|
|
|
- page_allocator_traits::deallocate(page_allocator, static_cast<padded_page*>(tail_page), 1);
|
|
|
- array[i].set_tail_page(nullptr);
|
|
|
- } else {
|
|
|
- __TBB_ASSERT(!is_valid_page(array[i].get_head_page()), "head page pointer corrupt?");
|
|
|
- }
|
|
|
+ for (size_type index = 0; index < n_queue; ++index) {
|
|
|
+ array[index].clear(alloc);
|
|
|
}
|
|
|
+ head_counter.store(0, std::memory_order_relaxed);
|
|
|
+ tail_counter.store(0, std::memory_order_relaxed);
|
|
|
+ n_invalid_entries.store(0, std::memory_order_relaxed);
|
|
|
}
|
|
|
|
|
|
void assign( const concurrent_queue_rep& src, queue_allocator_type& alloc, item_constructor_type construct_item ) {
|
|
@@ -457,7 +453,7 @@ public:
|
|
|
}
|
|
|
}).on_exception( [&] {
|
|
|
for (size_type i = 0; i < queue_idx + 1; ++i) {
|
|
|
- array[i].clear(alloc);
|
|
|
+ array[i].clear_and_invalidate(alloc);
|
|
|
}
|
|
|
head_counter.store(0, std::memory_order_relaxed);
|
|
|
tail_counter.store(0, std::memory_order_relaxed);
|