跳至內容

環形緩衝區

本頁使用了標題或全文手工轉換
維基百科,自由的百科全書
圓形緩衝區的概念圖示。電腦主記憶體是線性位址空間,因此需要採用下述技術來邏輯實現圓形緩衝區

圓形緩衝區(circular buffer),也稱作圓形佇列(circular queue),迴圈緩衝區(cyclic buffer),環形緩衝區(ring buffer),是一種用於表示一個固定尺寸、頭尾相連的緩衝區的資料結構,適合快取資料流

用法

[編輯]

圓形緩衝區的一個有用特性是:當一個資料元素被用掉後,其餘資料元素不需要移動其儲存位置。相反,一個非圓形緩衝區(例如一個普通的佇列)在用掉一個資料元素後,其餘資料元素需要向前搬移。換句話說,圓形緩衝區適合實現先進先出緩衝區,而非圓形緩衝區適合後進先出緩衝區。


圓形緩衝區適合於事先明確了緩衝區的最大容量的情形。擴充一個圓形緩衝區的容量,需要搬移其中的資料。因此一個緩衝區如果需要經常調整其容量,用鏈結串列實現更為合適。

寫操作覆蓋圓形緩衝區中未被處理的資料在某些情況下是允許的。特別是在多媒體處理時。例如,音訊的生產者可以覆蓋掉音效卡尚未來得及處理的音訊資料。

工作過程

[編輯]

一個圓形緩衝區最初為空並有預定的長度。例如,這是一個具有七個元素空間的圓形緩衝區,其中底部的單線與箭頭表示「頭尾相接」形成一個圓形位址空間:

假定1被寫入緩衝區中部(對於圓形緩衝區來說,最初的寫入位置在哪裡是無關緊要的):

再寫入2個元素,分別是2 & 3 — 被追加在1之後:

如果兩個元素被處理,那麼是緩衝區中最老的兩個元素被移除。在本例中,1 & 2被移除,緩衝區中只剩下3:

如果緩衝區中有7個元素,則是滿的:

如果緩衝區是滿的,又要寫入新的資料,一種策略是覆蓋掉最老的資料。此例中,2個新資料— A & B — 寫入,覆蓋了3 & 4:

也可以採取其他策略,禁止覆蓋緩衝區的資料,採取返回一個錯誤碼或者丟擲異常

最終,如果從緩衝區中移除2個資料,不是3 & 4 而是 5 & 6 。因為 A & B 已經覆蓋了3 & 4:

圓形緩衝區工作機制

[編輯]

由於電腦主記憶體是線性位址空間,因此圓形緩衝區需要特別的設計才可以從邏輯上實現。

讀指標與寫指標

[編輯]

一般的,圓形緩衝區需要4個指標:

  • 在主記憶體中實際開始位置;
  • 在主記憶體中實際結束位置,也可以用緩衝區長度代替;
  • 儲存在緩衝區中的有效資料的開始位置(讀指標);
  • 儲存在緩衝區中的有效資料的結尾位置(寫指標)。

讀指標、寫指標可以用整型值來表示。

下例為一個未滿的緩衝區的讀寫指標:

下例為一個滿的緩衝區的讀寫指標:

區分緩衝區滿或者空

[編輯]

緩衝區是滿、或是空,都有可能出現讀指標與寫指標指向同一位置:

有多種策略用於檢測緩衝區是滿、或是空.

總是保持一個儲存單元為空

[編輯]

緩衝區中總是有一個儲存單元保持未使用狀態。緩衝區最多存入個資料。如果讀寫指標指向同一位置,則緩衝區為空。如果讀指標位於寫指標的相鄰後一個位置,則緩衝區為滿。這種策略的優點是簡單、粗暴;缺點是語意上實際可存資料量與緩衝區容量不一致,測試緩衝區是否滿需要做取餘數計算。

使用資料計數

[編輯]

這種策略不使用顯式的寫指標,而是保持著緩衝區主記憶體儲的資料的計數。因此測試緩衝區是空是滿非常簡單;對效能影響可以忽略。缺點是讀寫操作都需要修改這個儲存資料計數,對於多執行緒訪問緩衝區需要並行控制

鏡像指示位

[編輯]

緩衝區的長度如果是n,邏輯位址空間則為0至n-1;那麼,規定n至2n-1為鏡像邏輯位址空間。本策略規定讀寫指標的位址空間為0至2n-1,其中低半部分對應於常規的邏輯位址空間,高半部分對應於鏡像邏輯位址空間。當指標值大於等於2n時,使其折返(wrapped)到ptr-2n。使用一位表示寫指標或讀指標是否進入了虛擬的鏡像儲存區:置位表示進入,不置位表示沒進入還在基本儲存區。

Circular buffer - mirror solution full and empty

在讀寫指標的值相同情況下,如果二者的指示位相同,說明緩衝區為空;如果二者的指示位不同,說明緩衝區為滿。這種方法優點是測試緩衝區滿/空很簡單;不需要做取餘數操作;讀寫執行緒可以分別設計專用演算法策略,能實現精緻的並行控制。 缺點是讀寫指標各需要額外的一位作為指示位。

如果緩衝區長度是2的,則本方法可以省略鏡像指示位。如果讀寫指標的值相等,則緩衝區為空;如果讀寫指標相差n,則緩衝區為滿,這可以用條件表達式(寫指標 == (讀指標 互斥或 緩衝區長度))來判斷。

// This approach adds one bit to end and start pointers
// Circular buffer object
typedef struct {
    int size; // maximum number of elements
    int start; // index of oldest element
    int end; // index at which to write new element
    ElemType *elems; // vector of elements
} CircularBuffer;

void cbInit(CircularBuffer *cb, int size) {
    cb->size = size;
    cb->start = 0;
    cb->end = 0;
    cb->elems = (ElemType *)calloc(cb->size, sizeof(ElemType));
}

void cbPrint(CircularBuffer *cb) {
    printf("size = 0x%x, start = %d, end = %d\n", cb->size, cb->start, cb->end);
}

int cbIsFull(CircularBuffer *cb) {
    return cb->end == (cb->start ^ cb->size); // This inverts the most significant bit of start before comparison
}

int cbIsEmpty(CircularBuffer *cb) {
    return cb->end == cb->start;
}

int cbIncr(CircularBuffer *cb, int p) {
    return (p + 1) & (2 * cb->size - 1); // start and end pointers incrementation is done modulo 2*size
}

void cbWrite(CircularBuffer *cb, ElemType *elem) {
    cb->elems[cb->end & (cb->size - 1)] = *elem;
    if (cbIsFull(cb)) // full, overwrite moves start pointer
        cb->start = cbIncr(cb, cb->start);
    cb->end = cbIncr(cb, cb->end);
}

void cbRead(CircularBuffer *cb, ElemType *elem) {
    *elem = cb->elems[cb->start & (cb->size - 1)];
    cb->start = cbIncr(cb, cb->start);
}

讀/寫 計數

[編輯]

用兩個有符號整型變數分別儲存寫入、讀出緩衝區的資料數量。其差值就是緩衝區中尚未被處理的有效資料的數量。這種方法的優點是讀執行緒、寫執行緒互不干擾;缺點是需要額外兩個變數。

記錄最後的操作

[編輯]

使用一位記錄最後一次操作是讀還是寫。讀寫指標值相等情況下,如果最後一次操作為寫入,那麼緩衝區是滿的;如果最後一次操作為讀出,那麼緩衝區是空。 這種策略的缺點是讀寫操作共享一個標誌位,多執行緒時需要並行控制。

POSIX最佳化實現

[編輯]
#include <sys/mman.h>
#include <stdlib.h>
#include <unistd.h>

#define report_exceptional_condition() abort ()

struct ring_buffer {
    void *address;
    unsigned long count_bytes;
    unsigned long write_offset_bytes;
    unsigned long read_offset_bytes;
};

// Warning order should be at least 12 for Linux
void ring_buffer_create (struct ring_buffer *buffer, unsigned long order) {
    char path[] = "/dev/shm/ring-buffer-XXXXXX";
    int file_descriptor;
    void *address;
    int status;
    file_descriptor = mkstemp(path);
    if (file_descriptor < 0)
        report_exceptional_condition();
    status = unlink(path);
    if (status)
        report_exceptional_condition();
    buffer->count_bytes = 1UL << order;
    buffer->write_offset_bytes = 0;
    buffer->read_offset_bytes = 0;
    status = ftruncate(file_descriptor, buffer->count_bytes);
    if (status)
        report_exceptional_condition();
    buffer->address = mmap (NULL, buffer->count_bytes << 1, PROT_NONE,
                            MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
    if (buffer->address == MAP_FAILED)
        report_exceptional_condition();
    address =
        mmap(buffer->address, buffer->count_bytes, PROT_READ | PROT_WRITE,
             MAP_FIXED | MAP_SHARED, file_descriptor, 0);
    if (address != buffer->address)
        report_exceptional_condition();
    address = mmap(buffer->address + buffer->count_bytes,
                   buffer->count_bytes, PROT_READ | PROT_WRITE,
                   MAP_FIXED | MAP_SHARED, file_descriptor, 0);
    if (address != buffer->address + buffer->count_bytes)
        report_exceptional_condition();
    status = close(file_descriptor);
    if (status)
        report_exceptional_condition();
}

void ring_buffer_free(struct ring_buffer *buffer) {
    int status;
    status = munmap(buffer->address, buffer->count_bytes << 1);
    if (status)
        report_exceptional_condition ();
}

void *ring_buffer_write_address(struct ring_buffer *buffer) {
    // void pointer arithmetic is a constraint violation.
    return buffer->address + buffer->write_offset_bytes;
}

void ring_buffer_write_advance(struct ring_buffer *buffer, unsigned long count_bytes) {
    buffer->write_offset_bytes += count_bytes;
}

void *ring_buffer_read_address(struct ring_buffer *buffer) {
    return buffer->address + buffer->read_offset_bytes;
}

void ring_buffer_read_advance(struct ring_buffer *buffer, unsigned long count_bytes) {
    buffer->read_offset_bytes += count_bytes;
    if (buffer->read_offset_bytes >= buffer->count_bytes) {
        // 如果读指针大于等于缓冲区长度,那些读写指针同时折返回[0, buffer_size]范围内
        buffer->read_offset_bytes -= buffer->count_bytes;
        buffer->write_offset_bytes -= buffer->count_bytes;
    }
}

unsigned long ring_buffer_count_bytes(struct ring_buffer *buffer) {
    return buffer->write_offset_bytes - buffer->read_offset_bytes;
}

unsigned long ring_buffer_count_free_bytes(struct ring_buffer *buffer) {
    return buffer->count_bytes - ring_buffer_count_bytes (buffer);
}

void ring_buffer_clear(struct ring_buffer *buffer) {
    buffer->write_offset_bytes = 0;
    buffer->read_offset_bytes = 0;
}

/*  Note, that initial anonymous mmap() can be avoided - after initial mmap() for descriptor fd,
    you can try mmap() with hinted address as (buffer->address + buffer->count_bytes) and if it fails -
    another one with hinted address as (buffer->address - buffer->count_bytes).
    Make sure MAP_FIXED is not used in such case, as under certain situations it could end with segfault.
    The advantage of such approach is, that it avoids requirement to map twice the amount you need initially
    (especially useful e.g. if you want to use hugetlbfs and the allowed amount is limited)
    and in context of gcc/glibc - you can avoid certain feature macros
    (MAP_ANONYMOUS usually requires one of: _BSD_SOURCE, _SVID_SOURCE or _GNU_SOURCE). */

Linux核心的kfifo

[編輯]

在Linux核心檔案kfifo.h和kfifo.c中,定義了一個先進先出圓形緩衝區實現。如果只有一個讀執行緒、一個寫執行緒,二者沒有共享的被修改的控制變數,那麼可以證明這種情況下不需要並行控制。kfifo就滿足上述條件。kfifo要求緩衝區長度必須為2的冪。讀、寫指標分別是無符號整型變數。把讀寫指標變換為緩衝區內的索引值,僅需要「按位元與」操作:(指標值 按位元與 (緩衝區長度-1))。這避免了計算代價高昂的「求余」操作。且下述關係總是成立:

讀指標 + 緩衝區儲存的資料長度 == 寫指標

即使在寫指標達到了無符號整型的上界,上溢位後寫指標的值小於讀指標的值,上述關係仍然保持成立(這是因為無符號整型加法的性質)。 kfifo的寫操作,首先計算緩衝區中當前可寫入儲存空間的資料長度:

len = min{待寫入資料長度, 緩衝區長度 - (寫指標 - 讀指標)}

然後,分兩段寫入資料。第一段是從寫指標開始向緩衝區末尾方向;第二段是從緩衝區起始處寫入餘下的可寫入資料,這部分可能資料長度為0即並無實際資料寫入。

外部連結

[編輯]