priority queue with updatable priority

事情是这样的,我正在维护一组会话,每个会话每隔一段时间都需要被轮询,每个会话轮询的间隔都是不等且变化的 —— 每次轮询之后,它会告诉我下次什么时候来重新轮询

当然,在那个它告诉我的 “下次轮询时间” 之前轮询它并不会有什么作用(也没有副作用),在那个时间之后轮询则会产生一点效率损失,比如会话上的包被更晚处理了,之类的

因此一开始我的实现非常粗暴:持续地以一个非常小的时间间隔 tick,每个 tick 都遍历所有的会话并轮询它们。但假设一个会话 100ms 后才需要再次轮询,我却每 5ms 都轮询一遍所有会话,显然做了许多无用功

于是我希望用优先队列来安排每个会话的轮询时间,当然,最先想到的就是 std::priority_queue

但还有一些需求无法被满足,因为会话的轮询还有一个变量:会话的下次轮询时间可能会改变

  • 假如会话 A 原本计于 100ms 后被再次轮询,但其上产生了一个新的消息(或是什么事件),则会话 A 可能需要在 10ms 后就被轮询

放到优先队列的语境里,即一个队列内元素的优先级可能会被改变,但 std::priority_queue 并不提供改变优先级的接口,事实上它也不能做到,因为它基于堆实现,想修改堆内任意一个元素的值而不破坏堆的性质,需要以 O(N) 的代价重建堆来实现,但我们期望一个 O(log n) 的更新手段

类似的需求也在 Dijkstra 算法中被用到,其解决办法在 Easiest way of using min priority queue with key update in C++ 中被提到:在 Dijkstra 算法对优先队列的需求中,可以通过 "lazy deletion" 来实现类似的功能,即不考虑更新元素的优先级,而是直接将更小优先级的元素插入队列中,新插入的元素总是会比旧元素更早被 pop,只需要加上去重逻辑,就变相 “更新” 了更新旧元素的优先级

但是在本项目中,我觉得这个方法并不适用,因为在 Dijkstra 中图的大小是 bounded 的,而会话上消息的产生是 unbounded 的,如果每次有新消息产生并要更新会话优先级时,都插入一个新元素,那 priority queue 可能会爆掉,或者产生效率问题(比如 pop 的时候需要去除大量重复元素)

因此,我希望有这样一个优先队列,它能够以某个优先级安排队列内所有元素的顺序,又可以快速地更新某个元素的优先级

实现

一个简单的实现如下

template <typename Priority, typename Val>
class UniquePriorityQueue
{
public:
    void push_or_update(const Priority &p, const Val &v)
    {
        auto vp_it = _vp_map.find(v); // O(log n)
        if (vp_it != _vp_map.end())
        { // update
            auto pv_it = _pv_map.find(vp_it->second); // O(log n)
            assert(pv_it != _pv_map.end());
            _pv_map.erase(pv_it); // O(1)

            vp_it->second = p;
            _pv_map.insert({vp_it->second, vp_it->first}); // O(log n)
        }
        else
        {
            auto [vp_it, success] = _vp_map.emplace(v, p); // O(log n)
            assert(success);
            _pv_map.insert({vp_it->second, vp_it->first}); // O(log n)
        }
    }

    void pop()
    {
        auto pv_it = _pv_map.begin();
        assert(pv_it != _pv_map.end());
        auto vp_it = _vp_map.find(pv_it->second);
        assert(vp_it != _vp_map.end());
        _pv_map.erase(pv_it);
        _vp_map.erase(vp_it);
    }

    const std::pair<const Priority&, const Val&> top() const {
        auto pv_it = _pv_map.begin();
        assert(pv_it != _pv_map.end());
        return {pv_it->first, pv_it->second};
    }

    bool empty() const {
        assert(_pv_map.size() == _vp_map.size());
        return _vp_map.empty();
    }
private:
    std::map<Priority, const Val &> _pv_map;
    std::unordered_map<Val, Priority> _vp_map;
};

之所以叫 UniquePriorityQueue,是因为队列内的 Val 是不重复的,如果一个 Val 已经存在,那么 push_or_update 的语义就是更新其优先级

各接口的复杂度为:

  • O(1): top(), empty()
  • O(log n): push_or_update(), pop()

以下是一个完整可编译的程序(C++20), 模拟了刚才所说的安排会话的例子

#include <map>
#include <unordered_map>
#include <vector>
#include <memory>
#include <cassert>
#include <random>

template <typename Priority, typename Val>
class UniquePriorityQueue
{
public:
    void push_or_update(const Priority &p, const Val &v)
    {
        auto vp_it = _vp_map.find(v);
        if (vp_it != _vp_map.end())
        { // update
            auto pv_it = _pv_map.find(vp_it->second);
            assert(pv_it != _pv_map.end());
            _pv_map.erase(pv_it);

            vp_it->second = p;
            _pv_map.insert({vp_it->second, vp_it->first});
        }
        else
        {
            auto [vp_it, success] = _vp_map.emplace(v, p);
            assert(success);
            _pv_map.insert({vp_it->second, vp_it->first});
        }
    }

    void pop()
    {
        auto pv_it = _pv_map.begin();
        assert(pv_it != _pv_map.end());
        auto vp_it = _vp_map.find(pv_it->second);
        assert(vp_it != _vp_map.end());
        _pv_map.erase(pv_it);
        _vp_map.erase(vp_it);
    }

    const std::pair<const Priority&, const Val&> top() const {
        auto pv_it = _pv_map.begin();
        assert(pv_it != _pv_map.end());
        return {pv_it->first, pv_it->second};
    }

    bool empty() const {
        assert(_pv_map.size() == _vp_map.size());
        return _vp_map.empty();
    }
private:
    std::map<Priority, const Val &> _pv_map;
    std::unordered_map<Val, Priority> _vp_map;
};

struct Job
{
    int id;
    int i = 0;
    std::vector<int> schedule;

    Job(int id) : id(id) {}

    int next()
    {
        if (i < schedule.size())
        {
            return schedule[i++];
        }
        else
        {
            return INT32_MAX;
        }
    }
};

constexpr int JOBS = 1000;
constexpr int ITERATIONS = 1000000;

int main()
{
    std::srand(time(nullptr));

    UniquePriorityQueue<int, std::shared_ptr<Job>> q;

    std::vector<std::shared_ptr<Job>> jobs;
    std::vector<Job *> schedule;

    // generate N jobs
    for (int i = 0; i < JOBS; i++)
    {
        jobs.push_back(std::make_shared<Job>(i));
    }

    // tick ITERATIONS times, each tick one random job should be done, use `schedule` to record the order
    for (int priority = 0; priority < ITERATIONS; priority++)
    {
        auto job = jobs[std::rand() % JOBS];
        schedule.push_back(job.get());

        job->schedule.push_back(priority);
    }

    // init priority queue
    for (auto& job: jobs)
    {
        auto next = job->next();
        q.push_or_update(next, job);
    }

    // tick, each tick get the job with the lowest priority, and update its priority
    for (int i = 0; i < ITERATIONS; i++)
    {
        printf("\rticking %d/%d", i, ITERATIONS);
        const auto &[p, job] = q.top();

        // check if the order is correct
        assert(p == i);
        assert(job.get() == schedule[i]);

        // schedule next
        auto next = job->next();
        assert(next > p);

        q.push_or_update(next, job);
    }
    assert(q.top().first == INT32_MAX);
    return 0;
}