@ -27,7 +27,6 @@ import (
// Queue is a concurrent-safe queue built on doubly linked list and channel.
// Queue is a concurrent-safe queue built on doubly linked list and channel.
type Queue struct {
type Queue struct {
limit int // Limit for queue size.
limit int // Limit for queue size.
length * gtype . Int64 // Queue length.
list * glist . List // Underlying list structure for data maintaining.
list * glist . List // Underlying list structure for data maintaining.
closed * gtype . Bool // Whether queue is closed.
closed * gtype . Bool // Whether queue is closed.
events chan struct { } // Events for data writing.
events chan struct { } // Events for data writing.
@ -45,7 +44,6 @@ const (
func New ( limit ... int ) * Queue {
func New ( limit ... int ) * Queue {
q := & Queue {
q := & Queue {
closed : gtype . NewBool ( ) ,
closed : gtype . NewBool ( ) ,
length : gtype . NewInt64 ( ) ,
}
}
if len ( limit ) > 0 && limit [ 0 ] > 0 {
if len ( limit ) > 0 && limit [ 0 ] > 0 {
q . limit = limit [ 0 ]
q . limit = limit [ 0 ]
@ -62,7 +60,6 @@ func New(limit ...int) *Queue {
// Push pushes the data `v` into the queue.
// Push pushes the data `v` into the queue.
// Note that it would panic if Push is called after the queue is closed.
// Note that it would panic if Push is called after the queue is closed.
func ( q * Queue ) Push ( v interface { } ) {
func ( q * Queue ) Push ( v interface { } ) {
q . length . Add ( 1 )
if q . limit > 0 {
if q . limit > 0 {
q . C <- v
q . C <- v
} else {
} else {
@ -76,9 +73,7 @@ func (q *Queue) Push(v interface{}) {
// Pop pops an item from the queue in FIFO way.
// Pop pops an item from the queue in FIFO way.
// Note that it would return nil immediately if Pop is called after the queue is closed.
// Note that it would return nil immediately if Pop is called after the queue is closed.
func ( q * Queue ) Pop ( ) interface { } {
func ( q * Queue ) Pop ( ) interface { } {
item := <- q . C
return <- q . C
q . length . Add ( - 1 )
return item
}
}
// Close closes the queue.
// Close closes the queue.
@ -101,13 +96,18 @@ func (q *Queue) Close() {
}
}
// Len returns the length of the queue.
// Len returns the length of the queue.
// Note that the result might not be accurate as there's an
// Note that the result might not be accurate if using unlimited queue size as there's an
// asynchronous channel reading the list constantly.
// asynchronous channel reading the list constantly.
func ( q * Queue ) Len ( ) ( length int64 ) {
func ( q * Queue ) Len ( ) ( length int64 ) {
return q . length . Val ( )
bufferedSize := int64 ( len ( q . C ) )
if q . limit > 0 {
return bufferedSize
}
return int64 ( q . list . Size ( ) ) + bufferedSize
}
}
// Size is alias of Len.
// Size is alias of Len.
// Deprecated: use Len instead.
func ( q * Queue ) Size ( ) int64 {
func ( q * Queue ) Size ( ) int64 {
return q . Len ( )
return q . Len ( )
}
}
@ -123,14 +123,11 @@ func (q *Queue) asyncLoopFromListToChannel() {
for ! q . closed . Val ( ) {
for ! q . closed . Val ( ) {
<- q . events
<- q . events
for ! q . closed . Val ( ) {
for ! q . closed . Val ( ) {
if length := q . list . Len ( ) ; length > 0 {
if bufferLength := q . list . Len ( ) ; bufferLength > 0 {
if length > defaultBatchSize {
// When q.C is closed, it will panic here, especially q.C is being blocked for writing.
length = defaultBatchSize
// If any error occurs here, it will be caught by recover and be ignored.
}
for i := 0 ; i < bufferLength ; i ++ {
for _ , v := range q . list . PopFronts ( length ) {
q . C <- q . list . PopFront ( )
// When q.C is closed, it will panic here, especially q.C is being blocked for writing.
// If any error occurs here, it will be caught by recover and be ignored.
q . C <- v
}
}
} else {
} else {
break
break