You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
270 lines
6.7 KiB
270 lines
6.7 KiB
package queue
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"log"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
// Number of bytes to encode 0 in uvarint format
|
|
minimumHeaderSize = 17 // 1 byte blobsize + timestampSizeInBytes + hashSizeInBytes
|
|
// Bytes before left margin are not used. Zero index means element does not exist in queue, useful while reading slice from index
|
|
leftMarginIndex = 1
|
|
)
|
|
|
|
var (
|
|
errEmptyQueue = &queueError{"Empty queue"}
|
|
errInvalidIndex = &queueError{"Index must be greater than zero. Invalid index."}
|
|
errIndexOutOfBounds = &queueError{"Index out of range"}
|
|
)
|
|
|
|
// BytesQueue is a non-thread safe queue type of fifo based on bytes array.
|
|
// For every push operation index of entry is returned. It can be used to read the entry later
|
|
type BytesQueue struct {
|
|
full bool
|
|
array []byte
|
|
capacity int
|
|
maxCapacity int
|
|
head int
|
|
tail int
|
|
count int
|
|
rightMargin int
|
|
headerBuffer []byte
|
|
verbose bool
|
|
}
|
|
|
|
type queueError struct {
|
|
message string
|
|
}
|
|
|
|
// getNeededSize returns the number of bytes an entry of length need in the queue
|
|
func getNeededSize(length int) int {
|
|
var header int
|
|
switch {
|
|
case length < 127: // 1<<7-1
|
|
header = 1
|
|
case length < 16382: // 1<<14-2
|
|
header = 2
|
|
case length < 2097149: // 1<<21 -3
|
|
header = 3
|
|
case length < 268435452: // 1<<28 -4
|
|
header = 4
|
|
default:
|
|
header = 5
|
|
}
|
|
|
|
return length + header
|
|
}
|
|
|
|
// NewBytesQueue initialize new bytes queue.
|
|
// capacity is used in bytes array allocation
|
|
// When verbose flag is set then information about memory allocation are printed
|
|
func NewBytesQueue(capacity int, maxCapacity int, verbose bool) *BytesQueue {
|
|
return &BytesQueue{
|
|
array: make([]byte, capacity),
|
|
capacity: capacity,
|
|
maxCapacity: maxCapacity,
|
|
headerBuffer: make([]byte, binary.MaxVarintLen32),
|
|
tail: leftMarginIndex,
|
|
head: leftMarginIndex,
|
|
rightMargin: leftMarginIndex,
|
|
verbose: verbose,
|
|
}
|
|
}
|
|
|
|
// Reset removes all entries from queue
|
|
func (q *BytesQueue) Reset() {
|
|
// Just reset indexes
|
|
q.tail = leftMarginIndex
|
|
q.head = leftMarginIndex
|
|
q.rightMargin = leftMarginIndex
|
|
q.count = 0
|
|
q.full = false
|
|
}
|
|
|
|
// Push copies entry at the end of queue and moves tail pointer. Allocates more space if needed.
|
|
// Returns index for pushed data or error if maximum size queue limit is reached.
|
|
func (q *BytesQueue) Push(data []byte) (int, error) {
|
|
neededSize := getNeededSize(len(data))
|
|
|
|
if !q.canInsertAfterTail(neededSize) {
|
|
if q.canInsertBeforeHead(neededSize) {
|
|
q.tail = leftMarginIndex
|
|
} else if q.capacity+neededSize >= q.maxCapacity && q.maxCapacity > 0 {
|
|
return -1, &queueError{"Full queue. Maximum size limit reached."}
|
|
} else {
|
|
q.allocateAdditionalMemory(neededSize)
|
|
}
|
|
}
|
|
|
|
index := q.tail
|
|
|
|
q.push(data, neededSize)
|
|
|
|
return index, nil
|
|
}
|
|
|
|
func (q *BytesQueue) allocateAdditionalMemory(minimum int) {
|
|
start := time.Now()
|
|
if q.capacity < minimum {
|
|
q.capacity += minimum
|
|
}
|
|
q.capacity = q.capacity * 2
|
|
if q.capacity > q.maxCapacity && q.maxCapacity > 0 {
|
|
q.capacity = q.maxCapacity
|
|
}
|
|
|
|
oldArray := q.array
|
|
q.array = make([]byte, q.capacity)
|
|
|
|
if leftMarginIndex != q.rightMargin {
|
|
copy(q.array, oldArray[:q.rightMargin])
|
|
|
|
if q.tail <= q.head {
|
|
if q.tail != q.head {
|
|
// created slice is slightly larger then need but this is fine after only the needed bytes are copied
|
|
q.push(make([]byte, q.head-q.tail), q.head-q.tail)
|
|
}
|
|
|
|
q.head = leftMarginIndex
|
|
q.tail = q.rightMargin
|
|
}
|
|
}
|
|
|
|
q.full = false
|
|
|
|
if q.verbose {
|
|
log.Printf("Allocated new queue in %s; Capacity: %d \n", time.Since(start), q.capacity)
|
|
}
|
|
}
|
|
|
|
func (q *BytesQueue) push(data []byte, len int) {
|
|
headerEntrySize := binary.PutUvarint(q.headerBuffer, uint64(len))
|
|
q.copy(q.headerBuffer, headerEntrySize)
|
|
|
|
q.copy(data, len-headerEntrySize)
|
|
|
|
if q.tail > q.head {
|
|
q.rightMargin = q.tail
|
|
}
|
|
if q.tail == q.head {
|
|
q.full = true
|
|
}
|
|
|
|
q.count++
|
|
}
|
|
|
|
func (q *BytesQueue) copy(data []byte, len int) {
|
|
q.tail += copy(q.array[q.tail:], data[:len])
|
|
}
|
|
|
|
// Pop reads the oldest entry from queue and moves head pointer to the next one
|
|
func (q *BytesQueue) Pop() ([]byte, error) {
|
|
data, blockSize, err := q.peek(q.head)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
q.head += blockSize
|
|
q.count--
|
|
|
|
if q.head == q.rightMargin {
|
|
q.head = leftMarginIndex
|
|
if q.tail == q.rightMargin {
|
|
q.tail = leftMarginIndex
|
|
}
|
|
q.rightMargin = q.tail
|
|
}
|
|
|
|
q.full = false
|
|
|
|
return data, nil
|
|
}
|
|
|
|
// Peek reads the oldest entry from list without moving head pointer
|
|
func (q *BytesQueue) Peek() ([]byte, error) {
|
|
data, _, err := q.peek(q.head)
|
|
return data, err
|
|
}
|
|
|
|
// Get reads entry from index
|
|
func (q *BytesQueue) Get(index int) ([]byte, error) {
|
|
data, _, err := q.peek(index)
|
|
return data, err
|
|
}
|
|
|
|
// CheckGet checks if an entry can be read from index
|
|
func (q *BytesQueue) CheckGet(index int) error {
|
|
return q.peekCheckErr(index)
|
|
}
|
|
|
|
// Capacity returns number of allocated bytes for queue
|
|
func (q *BytesQueue) Capacity() int {
|
|
return q.capacity
|
|
}
|
|
|
|
// Len returns number of entries kept in queue
|
|
func (q *BytesQueue) Len() int {
|
|
return q.count
|
|
}
|
|
|
|
// Error returns error message
|
|
func (e *queueError) Error() string {
|
|
return e.message
|
|
}
|
|
|
|
// peekCheckErr is identical to peek, but does not actually return any data
|
|
func (q *BytesQueue) peekCheckErr(index int) error {
|
|
|
|
if q.count == 0 {
|
|
return errEmptyQueue
|
|
}
|
|
|
|
if index <= 0 {
|
|
return errInvalidIndex
|
|
}
|
|
|
|
if index >= len(q.array) {
|
|
return errIndexOutOfBounds
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// peek returns the data from index and the number of bytes to encode the length of the data in uvarint format
|
|
func (q *BytesQueue) peek(index int) ([]byte, int, error) {
|
|
err := q.peekCheckErr(index)
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
|
|
blockSize, n := binary.Uvarint(q.array[index:])
|
|
return q.array[index+n : index+int(blockSize)], int(blockSize), nil
|
|
}
|
|
|
|
// canInsertAfterTail returns true if it's possible to insert an entry of size of need after the tail of the queue
|
|
func (q *BytesQueue) canInsertAfterTail(need int) bool {
|
|
if q.full {
|
|
return false
|
|
}
|
|
if q.tail >= q.head {
|
|
return q.capacity-q.tail >= need
|
|
}
|
|
// 1. there is exactly need bytes between head and tail, so we do not need
|
|
// to reserve extra space for a potential empty entry when realloc this queue
|
|
// 2. still have unused space between tail and head, then we must reserve
|
|
// at least headerEntrySize bytes so we can put an empty entry
|
|
return q.head-q.tail == need || q.head-q.tail >= need+minimumHeaderSize
|
|
}
|
|
|
|
// canInsertBeforeHead returns true if it's possible to insert an entry of size of need before the head of the queue
|
|
func (q *BytesQueue) canInsertBeforeHead(need int) bool {
|
|
if q.full {
|
|
return false
|
|
}
|
|
if q.tail >= q.head {
|
|
return q.head-leftMarginIndex == need || q.head-leftMarginIndex >= need+minimumHeaderSize
|
|
}
|
|
return q.head-q.tail == need || q.head-q.tail >= need+minimumHeaderSize
|
|
}
|