You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
go-library/vendor/github.com/tencentyun/cos-go-sdk-v5/object_select.go

446 lines
12 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package cos
import (
"bytes"
"context"
"encoding/binary"
"encoding/xml"
"fmt"
"hash/crc32"
"io"
"io/ioutil"
"net/http"
"os"
"time"
)
type JSONInputSerialization struct {
Type string `xml:"Type,omitempty"`
}
type CSVInputSerialization struct {
RecordDelimiter string `xml:"RecordDelimiter,omitempty"`
FieldDelimiter string `xml:"FieldDelimiter,omitempty"`
QuoteCharacter string `xml:"QuoteCharacter,omitempty"`
QuoteEscapeCharacter string `xml:"QuoteEscapeCharacter,omitempty"`
AllowQuotedRecordDelimiter string `xml:"AllowQuotedRecordDelimiter,omitempty"`
FileHeaderInfo string `xml:"FileHeaderInfo,omitempty"`
Comments string `xml:"Comments,omitempty"`
}
type SelectInputSerialization struct {
CompressionType string `xml:"CompressionType,omitempty"`
CSV *CSVInputSerialization `xml:"CSV,omitempty"`
JSON *JSONInputSerialization `xml:"JSON,omitempty"`
}
type JSONOutputSerialization struct {
RecordDelimiter string `xml:"RecordDelimiter,omitempty"`
}
type CSVOutputSerialization struct {
QuoteFields string `xml:"QuoteFields,omitempty"`
RecordDelimiter string `xml:"RecordDelimiter,omitempty"`
FieldDelimiter string `xml:"FieldDelimiter,omitempty"`
QuoteCharacter string `xml:"QuoteCharacter,omitempty"`
QuoteEscapeCharacter string `xml:"QuoteEscapeCharacter,omitempty"`
}
type SelectOutputSerialization struct {
CSV *CSVOutputSerialization `xml:"CSV,omitempty"`
JSON *JSONOutputSerialization `xml:"JSON,omitempty"`
}
type ObjectSelectOptions struct {
XMLName xml.Name `xml:"SelectRequest"`
Expression string `xml:"Expression"`
ExpressionType string `xml:"ExpressionType"`
InputSerialization *SelectInputSerialization `xml:"InputSerialization"`
OutputSerialization *SelectOutputSerialization `xml:"OutputSerialization"`
RequestProgress string `xml:"RequestProgress>Enabled,omitempty"`
}
func (s *ObjectService) Select(ctx context.Context, name string, opt *ObjectSelectOptions) (io.ReadCloser, error) {
u := fmt.Sprintf("/%s?select&select-type=2", encodeURIComponent(name))
sendOpt := sendOptions{
baseURL: s.client.BaseURL.BucketURL,
uri: u,
method: http.MethodPost,
body: opt,
disableCloseBody: true,
}
resp, err := s.client.send(ctx, &sendOpt)
if err != nil {
return nil, err
}
result := &ObjectSelectResponse{
Headers: resp.Header,
Body: resp.Body,
StatusCode: resp.StatusCode,
Frame: &ObjectSelectResult{
NextFrame: true,
Payload: []byte{},
},
Finish: false,
}
return result, nil
}
func (s *ObjectService) SelectToFile(ctx context.Context, name, file string, opt *ObjectSelectOptions) (*ObjectSelectResponse, error) {
resp, err := s.Select(ctx, name, opt)
if err != nil {
return nil, err
}
res, _ := resp.(*ObjectSelectResponse)
defer func() {
io.Copy(ioutil.Discard, resp)
resp.Close()
}()
fd, err := os.OpenFile(file, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, os.FileMode(0664))
if err != nil {
return res, err
}
_, err = io.Copy(fd, resp)
fd.Close()
res.Finish = true
return res, err
}
const (
kReadTimeout = 3
kMessageType = ":message-type"
kEventType = ":event-type"
kContentType = ":content-type"
kRecordsFrameType = iota
kContinuationFrameType
kProgressFrameType
kStatsFrameType
kEndFrameType
kErrorFrameType
)
type ProgressFrame struct {
XMLName xml.Name `xml:"Progress"`
BytesScanned int `xml:"BytesScanned"`
BytesProcessed int `xml:"BytesProcessed"`
BytesReturned int `xml:"BytesReturned"`
}
type StatsFrame struct {
XMLName xml.Name `xml:"Stats"`
BytesScanned int `xml:"BytesScanned"`
BytesProcessed int `xml:"BytesProcessed"`
BytesReturned int `xml:"BytesReturned"`
}
type DataFrame struct {
ContentType string
ConsumedBytesLength int32
LeftBytesLength int32
}
type ErrorFrame struct {
Code string
Message string
}
func (e *ErrorFrame) Error() string {
return fmt.Sprintf("Error Code: %s, Error Message: %s", e.Code, e.Message)
}
type ObjectSelectResult struct {
TotalFrameLength int32
TotalHeaderLength int32
NextFrame bool
FrameType int
Payload []byte
DataFrame DataFrame
ProgressFrame ProgressFrame
StatsFrame StatsFrame
ErrorFrame *ErrorFrame
}
type ObjectSelectResponse struct {
StatusCode int
Headers http.Header
Body io.ReadCloser
Frame *ObjectSelectResult
Finish bool
}
func (osr *ObjectSelectResponse) Read(p []byte) (n int, err error) {
n, err = osr.readFrames(p)
return
}
func (osr *ObjectSelectResponse) Close() error {
return osr.Body.Close()
}
func (osr *ObjectSelectResponse) readFrames(p []byte) (int, error) {
if osr.Finish {
return 0, io.EOF
}
if osr.Frame.ErrorFrame != nil {
return 0, osr.Frame.ErrorFrame
}
var err error
var nlen int
dlen := len(p)
for nlen < dlen {
if osr.Frame.NextFrame == true {
osr.Frame.NextFrame = false
err := osr.analysisPrelude()
if err != nil {
return nlen, err
}
err = osr.analysisHeader()
if err != nil {
return nlen, err
}
}
switch osr.Frame.FrameType {
case kRecordsFrameType:
n, err := osr.analysisRecords(p[nlen:])
if err != nil {
return nlen, err
}
nlen += n
case kContinuationFrameType:
err = osr.payloadChecksum("ContinuationFrame")
if err != nil {
return nlen, err
}
case kProgressFrameType:
err := osr.analysisXml(&osr.Frame.ProgressFrame)
if err != nil {
return nlen, err
}
case kStatsFrameType:
err := osr.analysisXml(&osr.Frame.StatsFrame)
if err != nil {
return nlen, err
}
case kEndFrameType:
err = osr.payloadChecksum("EndFrame")
if err != nil {
return nlen, err
}
osr.Finish = true
return nlen, io.EOF
case kErrorFrameType:
return nlen, osr.Frame.ErrorFrame
}
}
return nlen, err
}
func (osr *ObjectSelectResponse) analysisPrelude() error {
frame := make([]byte, 12)
_, err := osr.fixedLengthRead(frame, kReadTimeout)
if err != nil {
return err
}
var preludeCRC uint32
bytesToInt(frame[0:4], &osr.Frame.TotalFrameLength)
bytesToInt(frame[4:8], &osr.Frame.TotalHeaderLength)
bytesToInt(frame[8:12], &preludeCRC)
osr.Frame.Payload = append(osr.Frame.Payload, frame...)
return checksum(frame[0:8], preludeCRC, "Prelude")
}
func (osr *ObjectSelectResponse) analysisHeader() error {
var nlen int32
headers := make(map[string]string)
for nlen < osr.Frame.TotalHeaderLength {
var headerNameLen int8
var headerValueLen int16
bHeaderNameLen := make([]byte, 1)
_, err := osr.fixedLengthRead(bHeaderNameLen, kReadTimeout)
if err != nil {
return err
}
nlen += 1
bytesToInt(bHeaderNameLen, &headerNameLen)
osr.Frame.Payload = append(osr.Frame.Payload, bHeaderNameLen...)
bHeaderName := make([]byte, headerNameLen)
_, err = osr.fixedLengthRead(bHeaderName, kReadTimeout)
if err != nil {
return err
}
nlen += int32(headerNameLen)
headerName := string(bHeaderName)
osr.Frame.Payload = append(osr.Frame.Payload, bHeaderName...)
bValueTypeLen := make([]byte, 3)
_, err = osr.fixedLengthRead(bValueTypeLen, kReadTimeout)
if err != nil {
return err
}
nlen += 3
bytesToInt(bValueTypeLen[1:], &headerValueLen)
osr.Frame.Payload = append(osr.Frame.Payload, bValueTypeLen...)
bHeaderValue := make([]byte, headerValueLen)
_, err = osr.fixedLengthRead(bHeaderValue, kReadTimeout)
if err != nil {
return err
}
nlen += int32(headerValueLen)
headers[headerName] = string(bHeaderValue)
osr.Frame.Payload = append(osr.Frame.Payload, bHeaderValue...)
}
htype, ok := headers[kMessageType]
if !ok {
return fmt.Errorf("header parse failed, no message-type, headers: %+v\n", headers)
}
switch {
case htype == "error":
osr.Frame.FrameType = kErrorFrameType
osr.Frame.ErrorFrame = &ErrorFrame{}
osr.Frame.ErrorFrame.Code, _ = headers[":error-code"]
osr.Frame.ErrorFrame.Message, _ = headers[":error-message"]
case htype == "event":
hevent, ok := headers[kEventType]
if !ok {
return fmt.Errorf("header parse failed, no event-type, headers: %+v\n", headers)
}
switch {
case hevent == "Records":
hContentType, ok := headers[kContentType]
if ok {
osr.Frame.DataFrame.ContentType = hContentType
}
osr.Frame.FrameType = kRecordsFrameType
case hevent == "Cont":
osr.Frame.FrameType = kContinuationFrameType
case hevent == "Progress":
osr.Frame.FrameType = kProgressFrameType
case hevent == "Stats":
osr.Frame.FrameType = kStatsFrameType
case hevent == "End":
osr.Frame.FrameType = kEndFrameType
default:
return fmt.Errorf("header parse failed, invalid event-type, headers: %+v\n", headers)
}
default:
return fmt.Errorf("header parse failed, invalid message-type: headers: %+v\n", headers)
}
return nil
}
func (osr *ObjectSelectResponse) analysisRecords(data []byte) (int, error) {
var needReadLength int32
dlen := int32(len(data))
restLen := osr.Frame.TotalFrameLength - 16 - osr.Frame.TotalHeaderLength - osr.Frame.DataFrame.ConsumedBytesLength
if dlen <= restLen {
needReadLength = dlen
} else {
needReadLength = restLen
}
n, err := osr.fixedLengthRead(data[:needReadLength], kReadTimeout)
if err != nil {
return n, fmt.Errorf("read data frame error: %s", err.Error())
}
osr.Frame.DataFrame.ConsumedBytesLength += int32(n)
osr.Frame.Payload = append(osr.Frame.Payload, data[:needReadLength]...)
// 读完了一帧数据并填充到data中了
if osr.Frame.DataFrame.ConsumedBytesLength == osr.Frame.TotalFrameLength-16-osr.Frame.TotalHeaderLength {
osr.Frame.DataFrame.ConsumedBytesLength = 0
err = osr.payloadChecksum("RecordFrame")
}
return n, err
}
func (osr *ObjectSelectResponse) analysisXml(frame interface{}) error {
payloadLength := osr.Frame.TotalFrameLength - 16 - osr.Frame.TotalHeaderLength
bFrame := make([]byte, payloadLength)
_, err := osr.fixedLengthRead(bFrame, kReadTimeout)
if err != nil {
return err
}
err = xml.Unmarshal(bFrame, frame)
if err != nil {
return err
}
osr.Frame.Payload = append(osr.Frame.Payload, bFrame...)
return osr.payloadChecksum("XmlFrame")
}
// 调用payloadChecksum时表示该帧已读完开始读取下一帧内容
func (osr *ObjectSelectResponse) payloadChecksum(ftype string) error {
bcrc := make([]byte, 4)
_, err := osr.fixedLengthRead(bcrc, kReadTimeout)
if err != nil {
return err
}
var res uint32
bytesToInt(bcrc, &res)
err = checksum(osr.Frame.Payload, res, ftype)
osr.Frame.NextFrame = true
osr.Frame.Payload = []byte{}
return err
}
type chanReadIO struct {
readLen int
err error
}
func (osr *ObjectSelectResponse) fixedLengthRead(p []byte, read_timeout int64) (int, error) {
timeout := time.Duration(read_timeout)
r := osr.Body
ch := make(chan chanReadIO, 1)
go func(p []byte) {
var needLen int
readChan := chanReadIO{}
needLen = len(p)
for {
n, err := r.Read(p[readChan.readLen:needLen])
readChan.readLen += n
if err != nil {
readChan.err = err
ch <- readChan
close(ch)
return
}
if readChan.readLen == needLen {
break
}
}
ch <- readChan
close(ch)
}(p)
select {
case <-time.After(time.Second * timeout):
return 0, fmt.Errorf("requestId: %s, readLen timeout, timeout is %d(second),need read:%d", osr.Headers.Get("x-cos-request-id"), timeout, len(p))
case result := <-ch:
return result.readLen, result.err
}
}
func bytesToInt(b []byte, ret interface{}) {
binBuf := bytes.NewBuffer(b)
binary.Read(binBuf, binary.BigEndian, ret)
}
func checksum(b []byte, rec uint32, ftype string) error {
c := crc32.ChecksumIEEE(b)
if c != rec {
return fmt.Errorf("parse type: %v, checksum failed, cal: %v, rec: %v\n", ftype, c, rec)
}
return nil
}