You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
dorm/vendor/github.com/qiniu/qmgo/query.go

415 lines
11 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

/*
Copyright 2020 The Qmgo Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package qmgo
import (
"context"
"fmt"
"reflect"
"github.com/qiniu/qmgo/middleware"
"github.com/qiniu/qmgo/operator"
qOpts "github.com/qiniu/qmgo/options"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/bsoncodec"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
// Query struct definition
type Query struct {
filter interface{}
sort interface{}
project interface{}
hint interface{}
limit *int64
skip *int64
batchSize *int64
noCursorTimeout *bool
collation *options.Collation
ctx context.Context
collection *mongo.Collection
opts []qOpts.FindOptions
registry *bsoncodec.Registry
}
func (q *Query) Collation(collation *options.Collation) QueryI {
newQ := q
newQ.collation = collation
return newQ
}
func (q *Query) NoCursorTimeout(n bool) QueryI {
newQ := q
newQ.noCursorTimeout = &n
return newQ
}
// BatchSize sets the value for the BatchSize field.
// Means the maximum number of documents to be included in each batch returned by the server.
func (q *Query) BatchSize(n int64) QueryI {
newQ := q
newQ.batchSize = &n
return newQ
}
// Sort is Used to set the sorting rules for the returned results
// Format: "age" or "+age" means to sort the age field in ascending order, "-age" means in descending order
// When multiple sort fields are passed in at the same time, they are arranged in the order in which the fields are passed in.
// For example, {"age", "-name"}, first sort by age in ascending order, then sort by name in descending order
func (q *Query) Sort(fields ...string) QueryI {
if len(fields) == 0 {
// A nil bson.D will not correctly serialize, but this case is no-op
// so an early return will do.
return q
}
var sorts bson.D
for _, field := range fields {
key, n := SplitSortField(field)
if key == "" {
panic("Sort: empty field name")
}
sorts = append(sorts, bson.E{Key: key, Value: n})
}
newQ := q
newQ.sort = sorts
return newQ
}
// Select is used to determine which fields are displayed or not displayed in the returned results
// Format: bson.M{"age": 1} means that only the age field is displayed
// bson.M{"age": 0} means to display other fields except age
// When _id is not displayed and is set to 0, it will be returned to display
func (q *Query) Select(projection interface{}) QueryI {
newQ := q
newQ.project = projection
return newQ
}
// Skip skip n records
func (q *Query) Skip(n int64) QueryI {
newQ := q
newQ.skip = &n
return newQ
}
// Hint sets the value for the Hint field.
// This should either be the index name as a string or the index specification
// as a document. The default value is nil, which means that no hint will be sent.
func (q *Query) Hint(hint interface{}) QueryI {
newQ := q
newQ.hint = hint
return newQ
}
// Limit limits the maximum number of documents found to n
// The default value is 0, and 0 means no limit, and all matching results are returned
// When the limit value is less than 0, the negative limit is similar to the positive limit, but the cursor is closed after returning a single batch result.
// Reference https://docs.mongodb.com/manual/reference/method/cursor.limit/index.html
func (q *Query) Limit(n int64) QueryI {
newQ := q
newQ.limit = &n
return newQ
}
// One query a record that meets the filter conditions
// If the search fails, an error will be returned
func (q *Query) One(result interface{}) error {
if len(q.opts) > 0 {
if err := middleware.Do(q.ctx, q.opts[0].QueryHook, operator.BeforeQuery); err != nil {
return err
}
}
opt := options.FindOne()
if q.collation != nil {
opt.SetCollation(q.collation)
}
if q.sort != nil {
opt.SetSort(q.sort)
}
if q.project != nil {
opt.SetProjection(q.project)
}
if q.skip != nil {
opt.SetSkip(*q.skip)
}
if q.hint != nil {
opt.SetHint(q.hint)
}
err := q.collection.FindOne(q.ctx, q.filter, opt).Decode(result)
if err != nil {
return err
}
if len(q.opts) > 0 {
if err := middleware.Do(q.ctx, q.opts[0].QueryHook, operator.AfterQuery); err != nil {
return err
}
}
return nil
}
// All query multiple records that meet the filter conditions
// The static type of result must be a slice pointer
func (q *Query) All(result interface{}) error {
if len(q.opts) > 0 {
if err := middleware.Do(q.ctx, q.opts[0].QueryHook, operator.BeforeQuery); err != nil {
return err
}
}
opt := options.Find()
if q.collation != nil {
opt.SetCollation(q.collation)
}
if q.sort != nil {
opt.SetSort(q.sort)
}
if q.project != nil {
opt.SetProjection(q.project)
}
if q.limit != nil {
opt.SetLimit(*q.limit)
}
if q.skip != nil {
opt.SetSkip(*q.skip)
}
if q.hint != nil {
opt.SetHint(q.hint)
}
if q.batchSize != nil {
opt.SetBatchSize(int32(*q.batchSize))
}
if q.noCursorTimeout != nil {
opt.SetNoCursorTimeout(*q.noCursorTimeout)
}
var err error
var cursor *mongo.Cursor
cursor, err = q.collection.Find(q.ctx, q.filter, opt)
c := Cursor{
ctx: q.ctx,
cursor: cursor,
err: err,
}
err = c.All(result)
if err != nil {
return err
}
if len(q.opts) > 0 {
if err := middleware.Do(q.ctx, q.opts[0].QueryHook, operator.AfterQuery); err != nil {
return err
}
}
return nil
}
// Count count the number of eligible entries
func (q *Query) Count() (n int64, err error) {
opt := options.Count()
if q.limit != nil {
opt.SetLimit(*q.limit)
}
if q.skip != nil {
opt.SetSkip(*q.skip)
}
return q.collection.CountDocuments(q.ctx, q.filter, opt)
}
// EstimatedCount count the number of the collection by using the metadata
func (q *Query) EstimatedCount() (n int64, err error) {
return q.collection.EstimatedDocumentCount(q.ctx)
}
// Distinct gets the unique value of the specified field in the collection and return it in the form of slice
// result should be passed a pointer to slice
// The function will verify whether the static type of the elements in the result slice is consistent with the data type obtained in mongodb
// reference https://docs.mongodb.com/manual/reference/command/distinct/
func (q *Query) Distinct(key string, result interface{}) error {
resultVal := reflect.ValueOf(result)
if resultVal.Kind() != reflect.Ptr {
return ErrQueryNotSlicePointer
}
resultElmVal := resultVal.Elem()
if resultElmVal.Kind() != reflect.Interface && resultElmVal.Kind() != reflect.Slice {
return ErrQueryNotSliceType
}
opt := options.Distinct()
res, err := q.collection.Distinct(q.ctx, key, q.filter, opt)
if err != nil {
return err
}
registry := q.registry
if registry == nil {
registry = bson.DefaultRegistry
}
valueType, valueBytes, err_ := bson.MarshalValueWithRegistry(registry, res)
if err_ != nil {
fmt.Printf("bson.MarshalValue err: %+v\n", err_)
return err_
}
rawValue := bson.RawValue{Type: valueType, Value: valueBytes}
err = rawValue.Unmarshal(result)
if err != nil {
fmt.Printf("rawValue.Unmarshal err: %+v\n", err)
return ErrQueryResultTypeInconsistent
}
return nil
}
// Cursor gets a Cursor object, which can be used to traverse the query result set
// After obtaining the CursorI object, you should actively call the Close interface to close the cursor
func (q *Query) Cursor() CursorI {
opt := options.Find()
if q.sort != nil {
opt.SetSort(q.sort)
}
if q.project != nil {
opt.SetProjection(q.project)
}
if q.limit != nil {
opt.SetLimit(*q.limit)
}
if q.skip != nil {
opt.SetSkip(*q.skip)
}
if q.batchSize != nil {
opt.SetBatchSize(int32(*q.batchSize))
}
if q.noCursorTimeout != nil {
opt.SetNoCursorTimeout(*q.noCursorTimeout)
}
var err error
var cur *mongo.Cursor
cur, err = q.collection.Find(q.ctx, q.filter, opt)
return &Cursor{
ctx: q.ctx,
cursor: cur,
err: err,
}
}
// Apply runs the findAndModify command, which allows updating, replacing
// or removing a document matching a query and atomically returning either the old
// version (the default) or the new version of the document (when ReturnNew is true)
//
// The Sort and Select query methods affect the result of Apply. In case
// multiple documents match the query, Sort enables selecting which document to
// act upon by ordering it first. Select enables retrieving only a selection
// of fields of the new or old document.
//
// When Change.Replace is true, it means replace at most one document in the collection
// and the update parameter must be a document and cannot contain any update operators;
// if no objects are found and Change.Upsert is false, it will returns ErrNoDocuments.
// When Change.Remove is true, it means delete at most one document in the collection
// and returns the document as it appeared before deletion; if no objects are found,
// it will returns ErrNoDocuments.
// When both Change.Replace and Change.Remove are falseit means update at most one document
// in the collection and the update parameter must be a document containing update operators;
// if no objects are found and Change.Upsert is false, it will returns ErrNoDocuments.
//
// reference: https://docs.mongodb.com/manual/reference/command/findAndModify/
func (q *Query) Apply(change Change, result interface{}) error {
var err error
if change.Remove {
err = q.findOneAndDelete(change, result)
} else if change.Replace {
err = q.findOneAndReplace(change, result)
} else {
err = q.findOneAndUpdate(change, result)
}
return err
}
// findOneAndDelete
// reference: https://docs.mongodb.com/manual/reference/method/db.collection.findOneAndDelete/
func (q *Query) findOneAndDelete(change Change, result interface{}) error {
opts := options.FindOneAndDelete()
if q.sort != nil {
opts.SetSort(q.sort)
}
if q.project != nil {
opts.SetProjection(q.project)
}
return q.collection.FindOneAndDelete(q.ctx, q.filter, opts).Decode(result)
}
// findOneAndReplace
// reference: https://docs.mongodb.com/manual/reference/method/db.collection.findOneAndReplace/
func (q *Query) findOneAndReplace(change Change, result interface{}) error {
opts := options.FindOneAndReplace()
if q.sort != nil {
opts.SetSort(q.sort)
}
if q.project != nil {
opts.SetProjection(q.project)
}
if change.Upsert {
opts.SetUpsert(change.Upsert)
}
if change.ReturnNew {
opts.SetReturnDocument(options.After)
}
err := q.collection.FindOneAndReplace(q.ctx, q.filter, change.Update, opts).Decode(result)
if change.Upsert && !change.ReturnNew && err == mongo.ErrNoDocuments {
return nil
}
return err
}
// findOneAndUpdate
// reference: https://docs.mongodb.com/manual/reference/method/db.collection.findOneAndUpdate/
func (q *Query) findOneAndUpdate(change Change, result interface{}) error {
opts := options.FindOneAndUpdate()
if q.sort != nil {
opts.SetSort(q.sort)
}
if q.project != nil {
opts.SetProjection(q.project)
}
if change.Upsert {
opts.SetUpsert(change.Upsert)
}
if change.ReturnNew {
opts.SetReturnDocument(options.After)
}
err := q.collection.FindOneAndUpdate(q.ctx, q.filter, change.Update, opts).Decode(result)
if change.Upsert && !change.ReturnNew && err == mongo.ErrNoDocuments {
return nil
}
return err
}