lf_queue.h
#pragma once #include <iostream> #include <vector> #include <atomic> #include "macros.h" namespace Common { // 无锁队列模板类,T为存储元素类型 template<typename T> class LFQueue final { public: // 构造函数,预分配num_elems个元素的空间 LFQueue(std::size_t num_elems) : store_(num_elems, T()) /* 预分配vector存储空间,每个元素初始化为T的默认值 */ { } // 获取下一个可写入的位置,返回指向元素的指针 auto getNextToWriteTo() noexcept { return &store_[next_write_index_]; } // 写入后更新写指针索引,并递增元素计数 auto updateWriteIndex() noexcept { next_write_index_ = (next_write_index_ + 1) % store_.size(); num_elements_++; } // 获取下一个可读元素的指针,如果队列为空则返回nullptr auto getNextToRead() const noexcept -> const T * { return (size() ? &store_[next_read_index_] : nullptr); } // 读取后更新读指针索引,并递减元素计数 auto updateReadIndex() noexcept { next_read_index_ = (next_read_index_ + 1) % store_.size(); // 断言确保队列非空,避免无效读取 ASSERT(num_elements_ != 0, "Read an invalid element in:" + std::to_string(pthread_self())); num_elements_--; } // 获取当前队列中元素的数量 auto size() const noexcept { return num_elements_.load(); } // 禁用默认构造、拷贝构造、移动构造和赋值操作 // 这样可以防止队列被意外复制或移动,导致资源管理错误 LFQueue() = delete; LFQueue(const LFQueue &) = delete; LFQueue(const LFQueue &&) = delete; LFQueue &operator=(const LFQueue &) = delete; LFQueue &operator=(const LFQueue &&) = delete; private: // 存储队列元素的向量,预分配固定大小 std::vector<T> store_; // 原子类型的写索引,指向下一个可写入的位置 std::atomic<size_t> next_write_index_ = {0}; // 原子类型的读索引,指向下一个可读取的位置 std::atomic<size_t> next_read_index_ = {0}; // 原子类型的元素计数器,记录当前队列中的元素数量 std::atomic<size_t> num_elements_ = {0}; }; }
例子:
#include "thread_utils.h" #include "lf_queue.h" struct MyStruct { int d_[3]; }; using namespace Common; auto consumeFunction(LFQueue<MyStruct>* lfq) { using namespace std::literals::chrono_literals; std::this_thread::sleep_for(5s); while(lfq->size()) { const auto d = lfq->getNextToRead(); lfq->updateReadIndex(); std::cout << "consumeFunction read elem:" << d->d_[0] << "," << d->d_[1] << "," << d->d_[2] << " lfq-size:" << lfq->size() << std::endl; std::this_thread::sleep_for(1s); } std::cout << "consumeFunction exiting." << std::endl; } int main(int, char **) { LFQueue<MyStruct> lfq(20); auto ct = createAndStartThread(-1, "", consumeFunction, &lfq); for(auto i = 0; i < 50; ++i) { const MyStruct d{i, i * 10, i * 100}; *(lfq.getNextToWriteTo()) = d; lfq.updateWriteIndex(); std::cout << "main constructed elem:" << d.d_[0] << "," << d.d_[1] << "," << d.d_[2] << " lfq-size:" << lfq.size() << std::endl; using namespace std::literals::chrono_literals; std::this_thread::sleep_for(1s); } ct->join(); std::cout << "main exiting." << std::endl; return 0; }
系统当前共有 426 篇文章