commit ca4b274a9c7113e6efe795221a12dad9ba1aba58 Author: 李光春 Date: Mon May 23 16:15:25 2022 +0800 - init diff --git a/ .github/workflows/codeql-analysis.yml b/ .github/workflows/codeql-analysis.yml new file mode 100644 index 0000000..75e465f --- /dev/null +++ b/ .github/workflows/codeql-analysis.yml @@ -0,0 +1,38 @@ +name: "CodeQL" + +on: + push: + branches: [ master ] + pull_request: + branches: [ master ] + schedule: + - cron: '36 15 * * 2' + +jobs: + analyze: + name: Analyze + runs-on: ubuntu-latest + permissions: + actions: read + contents: read + security-events: write + + strategy: + fail-fast: false + matrix: + language: [ 'go' ] + + steps: + - name: Checkout repository + uses: actions/checkout@v3 + + - name: Initialize CodeQL + uses: github/codeql-action/init@v2 + with: + languages: ${{ matrix.language }} + + - name: Autobuild + uses: github/codeql-action/autobuild@v2 + + - name: Perform CodeQL Analysis + uses: github/codeql-action/analyze@v2 diff --git a/ .github/workflows/go.yml b/ .github/workflows/go.yml new file mode 100644 index 0000000..94277cc --- /dev/null +++ b/ .github/workflows/go.yml @@ -0,0 +1,17 @@ +name: Go +on: + push: + branches: [ master ] + pull_request: + branches: [ master ] +jobs: + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - name: Set up Go + uses: actions/setup-go@v2 + with: + go-version: 1.18 + - name: Test + run: go test -v ./... \ No newline at end of file diff --git a/ .workflow/go.yml b/ .workflow/go.yml new file mode 100644 index 0000000..456a71a --- /dev/null +++ b/ .workflow/go.yml @@ -0,0 +1,27 @@ +version: '1.0' +name: go +displayName: go +triggers: + trigger: auto + pr: + branches: + include: + - master +stages: + - name: compile + displayName: 编译 + strategy: naturally + trigger: auto + steps: + - step: build@golang + name: Test + displayName: Golang 测试 + golangVersion: '1.8' + commands: + - go env -w GO111MODULE=on + - go env -w GOPROXY=https://goproxy.cn,direct + - go test -v ./... + notify: [] +permissions: + - role: admin + members: [] diff --git a/.circleci/config.yml b/.circleci/config.yml new file mode 100644 index 0000000..818c18b --- /dev/null +++ b/.circleci/config.yml @@ -0,0 +1,40 @@ +# Use the latest 2.1 version of CircleCI pipeline process engine. +# See: https://circleci.com/docs/2.0/configuration-reference +version: 2.1 +# Define a job to be invoked later in a workflow. +# See: https://circleci.com/docs/2.0/configuration-reference/#jobs +jobs: + build: + working_directory: ~/repo + # Specify the execution environment. You can specify an image from Dockerhub or use one of our Convenience Images from CircleCI's Developer Hub. + # See: https://circleci.com/docs/2.0/configuration-reference/#docker-machine-macos-windows-executor + docker: + - image: circleci/golang:1.18 + # Add steps to the job + # See: https://circleci.com/docs/2.0/configuration-reference/#steps + steps: + - checkout + - restore_cache: + keys: + - go-mod-v4-{{ checksum "go.sum" }} + - run: + name: Install Dependencies + command: go mod download + - save_cache: + key: go-mod-v4-{{ checksum "go.sum" }} + paths: + - "/go/pkg/mod" + - run: + name: Run tests + command: | + mkdir -p /tmp/test-reports + gotestsum --junitfile /tmp/test-reports/unit-tests.xml + - store_test_results: + path: /tmp/test-reports +# Invoke jobs via workflows +# See: https://circleci.com/docs/2.0/configuration-reference/#workflows +workflows: + sample: # This is the name of the workflow, feel free to change it to better match your workflow. + # Inside the workflow, you define the jobs you want to run. + jobs: + - build \ No newline at end of file diff --git a/.drone.yml b/.drone.yml new file mode 100644 index 0000000..41d1aeb --- /dev/null +++ b/.drone.yml @@ -0,0 +1,11 @@ +kind: pipeline +type: docker +name: clone + +steps: + - name: Test + image: golang:1.18 + commands: + - go env -w GO111MODULE=on + - go env -w GOPROXY=https://goproxy.cn,direct + - go test -v ./... \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..52eef4d --- /dev/null +++ b/.gitignore @@ -0,0 +1,9 @@ +.env +.git +.svn +.idea +.vscode +*.log +gomod.sh +/vendor/ +grpc_build.sh \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..b677ad8 --- /dev/null +++ b/README.md @@ -0,0 +1,25 @@ +

+Golang Jobs +

+ +📦 Golang Jobs + +[comment]: <> (go) +[![godoc](https://pkg.go.dev/badge/go.dtapp.net/gojobs?status.svg)](https://pkg.go.dev/go.dtapp.net/gojobs) +[![goproxy.cn](https://goproxy.cn/stats/go.dtapp.net/gojobs/badges/download-count.svg)](https://goproxy.cn/stats/go.dtapp.net/gojobs) +[![goreportcard.com](https://goreportcard.com/badge/go.dtapp.net/gojobs )](https://goreportcard.com/report/go.dtapp.net/gojobs) +[![deps.dev](https://img.shields.io/badge/deps-go-red.svg)](https://deps.dev/go/go.dtapp.net/gojobs) + +#### 安装使用 + +```go +go get -v -u go.dtapp.net/gojobs +``` + +#### 导入 + +```go +import ( + "go.dtapp.net/gojobs" +) +``` \ No newline at end of file diff --git a/cron.go b/cron.go new file mode 100644 index 0000000..c1b99ee --- /dev/null +++ b/cron.go @@ -0,0 +1,43 @@ +package goip + +import ( + "go.dtapp.net/gojobs/pb" + "google.golang.org/grpc" +) + +// CronConfig 定时任务配置 +type CronConfig struct { + Address string // 服务端口 127.0.0.1:8888 +} + +// Cron 定时任务 +type Cron struct { + CronConfig // 配置 + Pub pb.PubSubClient // 订阅 + Conn *grpc.ClientConn // 链接信息 +} + +// NewCron 创建定时任务 +func NewCron(config *CronConfig) *Cron { + + if config.Address == "" { + panic("请填写服务端口") + } + + c := &Cron{} + + c.Address = config.Address + + var err error + + // 建立连接 获取client + c.Conn, err = grpc.Dial(c.Address, grpc.WithInsecure()) + if err != nil { + panic("连接失败: " + err.Error()) + } + + // 新建一个客户端 + c.Pub = pb.NewPubSubClient(c.Conn) + + return c +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..833dfe8 --- /dev/null +++ b/go.mod @@ -0,0 +1,16 @@ +module go.dtapp.net/gojobs + +go 1.18 + +require ( + google.golang.org/grpc v1.46.2 + google.golang.org/protobuf v1.28.0 +) + +require ( + github.com/golang/protobuf v1.5.2 // indirect + golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 // indirect + golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect + golang.org/x/text v0.3.7 // indirect + google.golang.org/genproto v0.0.0-20220519153652-3a47de7e79bd // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..d3efd1a --- /dev/null +++ b/go.sum @@ -0,0 +1,139 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= +github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= +github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI= +github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= +github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= +github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= +golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 h1:NWy5+hlRbC7HK+PmcXVUmW1IMyFce7to56IUvhUFm7Y= +golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= +google.golang.org/genproto v0.0.0-20220519153652-3a47de7e79bd h1:e0TwkXOdbnH/1x5rc5MZ/VYyiZ4v+RdVfrGMqEwT68I= +google.golang.org/genproto v0.0.0-20220519153652-3a47de7e79bd/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0= +google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= +google.golang.org/grpc v1.46.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk= +google.golang.org/grpc v1.46.2 h1:u+MLGgVf7vRdjEYZ8wDFhAVNmhkbJ5hmrA1LMWK1CAQ= +google.golang.org/grpc v1.46.2/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw= +google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/grpc_build.sh b/grpc_build.sh new file mode 100644 index 0000000..90ad8f7 --- /dev/null +++ b/grpc_build.sh @@ -0,0 +1,5 @@ +#protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative --grpc-gateway_out . --grpc-gateway_opt paths=source_relative ./pb/basics.proto + +protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative ./pb/basics.proto +protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative ./pb/task.proto +protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative ./pb/pubsub.proto \ No newline at end of file diff --git a/pb/basics.pb.go b/pb/basics.pb.go new file mode 100644 index 0000000..d8f4273 --- /dev/null +++ b/pb/basics.pb.go @@ -0,0 +1,221 @@ +// 版本 + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.0 +// protoc v3.19.4 +// source: pb/basics.proto + +// 包名 + +package pb + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// 请求消息 +type BasicsRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Message string `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"` +} + +func (x *BasicsRequest) Reset() { + *x = BasicsRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_pb_basics_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *BasicsRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*BasicsRequest) ProtoMessage() {} + +func (x *BasicsRequest) ProtoReflect() protoreflect.Message { + mi := &file_pb_basics_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use BasicsRequest.ProtoReflect.Descriptor instead. +func (*BasicsRequest) Descriptor() ([]byte, []int) { + return file_pb_basics_proto_rawDescGZIP(), []int{0} +} + +func (x *BasicsRequest) GetMessage() string { + if x != nil { + return x.Message + } + return "" +} + +// 响应消息 +type BasicsResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Message string `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"` +} + +func (x *BasicsResponse) Reset() { + *x = BasicsResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_pb_basics_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *BasicsResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*BasicsResponse) ProtoMessage() {} + +func (x *BasicsResponse) ProtoReflect() protoreflect.Message { + mi := &file_pb_basics_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use BasicsResponse.ProtoReflect.Descriptor instead. +func (*BasicsResponse) Descriptor() ([]byte, []int) { + return file_pb_basics_proto_rawDescGZIP(), []int{1} +} + +func (x *BasicsResponse) GetMessage() string { + if x != nil { + return x.Message + } + return "" +} + +var File_pb_basics_proto protoreflect.FileDescriptor + +var file_pb_basics_proto_rawDesc = []byte{ + 0x0a, 0x0f, 0x70, 0x62, 0x2f, 0x62, 0x61, 0x73, 0x69, 0x63, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x12, 0x02, 0x70, 0x62, 0x22, 0x29, 0x0a, 0x0d, 0x42, 0x61, 0x73, 0x69, 0x63, 0x73, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x22, 0x2a, 0x0a, 0x0e, 0x42, 0x61, 0x73, 0x69, 0x63, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x32, 0x6a, 0x0a, 0x06, + 0x42, 0x61, 0x73, 0x69, 0x63, 0x73, 0x12, 0x2f, 0x0a, 0x04, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x11, + 0x2e, 0x70, 0x62, 0x2e, 0x42, 0x61, 0x73, 0x69, 0x63, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x12, 0x2e, 0x70, 0x62, 0x2e, 0x42, 0x61, 0x73, 0x69, 0x63, 0x73, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x2f, 0x0a, 0x04, 0x50, 0x6f, 0x6e, 0x67, 0x12, + 0x11, 0x2e, 0x70, 0x62, 0x2e, 0x42, 0x61, 0x73, 0x69, 0x63, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x12, 0x2e, 0x70, 0x62, 0x2e, 0x42, 0x61, 0x73, 0x69, 0x63, 0x73, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x07, 0x5a, 0x05, 0x2e, 0x2e, 0x2f, 0x70, + 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_pb_basics_proto_rawDescOnce sync.Once + file_pb_basics_proto_rawDescData = file_pb_basics_proto_rawDesc +) + +func file_pb_basics_proto_rawDescGZIP() []byte { + file_pb_basics_proto_rawDescOnce.Do(func() { + file_pb_basics_proto_rawDescData = protoimpl.X.CompressGZIP(file_pb_basics_proto_rawDescData) + }) + return file_pb_basics_proto_rawDescData +} + +var file_pb_basics_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_pb_basics_proto_goTypes = []interface{}{ + (*BasicsRequest)(nil), // 0: pb.BasicsRequest + (*BasicsResponse)(nil), // 1: pb.BasicsResponse +} +var file_pb_basics_proto_depIdxs = []int32{ + 0, // 0: pb.Basics.Ping:input_type -> pb.BasicsRequest + 0, // 1: pb.Basics.Pong:input_type -> pb.BasicsRequest + 1, // 2: pb.Basics.Ping:output_type -> pb.BasicsResponse + 1, // 3: pb.Basics.Pong:output_type -> pb.BasicsResponse + 2, // [2:4] is the sub-list for method output_type + 0, // [0:2] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_pb_basics_proto_init() } +func file_pb_basics_proto_init() { + if File_pb_basics_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_pb_basics_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*BasicsRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pb_basics_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*BasicsResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_pb_basics_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_pb_basics_proto_goTypes, + DependencyIndexes: file_pb_basics_proto_depIdxs, + MessageInfos: file_pb_basics_proto_msgTypes, + }.Build() + File_pb_basics_proto = out.File + file_pb_basics_proto_rawDesc = nil + file_pb_basics_proto_goTypes = nil + file_pb_basics_proto_depIdxs = nil +} diff --git a/pb/basics.proto b/pb/basics.proto new file mode 100644 index 0000000..32f526b --- /dev/null +++ b/pb/basics.proto @@ -0,0 +1,25 @@ +// 版本 +syntax = "proto3"; + +// 包名 +package pb; + +// 别名 +option go_package = "../pb"; + +// 定义服务 +service Basics{ + // 心跳 + rpc Ping(BasicsRequest) returns (BasicsResponse){}; + // 心跳 + rpc Pong(BasicsRequest) returns (BasicsResponse){}; +} +// 请求消息 +message BasicsRequest { + string message = 1; +} + +// 响应消息 +message BasicsResponse { + string message = 1; +} diff --git a/pb/basics_grpc.pb.go b/pb/basics_grpc.pb.go new file mode 100644 index 0000000..2ea488a --- /dev/null +++ b/pb/basics_grpc.pb.go @@ -0,0 +1,145 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc v3.19.4 +// source: pb/basics.proto + +package pb + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// BasicsClient is the client API for Basics service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type BasicsClient interface { + // 心跳 + Ping(ctx context.Context, in *BasicsRequest, opts ...grpc.CallOption) (*BasicsResponse, error) + // 心跳 + Pong(ctx context.Context, in *BasicsRequest, opts ...grpc.CallOption) (*BasicsResponse, error) +} + +type basicsClient struct { + cc grpc.ClientConnInterface +} + +func NewBasicsClient(cc grpc.ClientConnInterface) BasicsClient { + return &basicsClient{cc} +} + +func (c *basicsClient) Ping(ctx context.Context, in *BasicsRequest, opts ...grpc.CallOption) (*BasicsResponse, error) { + out := new(BasicsResponse) + err := c.cc.Invoke(ctx, "/pb.Basics/Ping", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *basicsClient) Pong(ctx context.Context, in *BasicsRequest, opts ...grpc.CallOption) (*BasicsResponse, error) { + out := new(BasicsResponse) + err := c.cc.Invoke(ctx, "/pb.Basics/Pong", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// BasicsServer is the server API for Basics service. +// All implementations must embed UnimplementedBasicsServer +// for forward compatibility +type BasicsServer interface { + // 心跳 + Ping(context.Context, *BasicsRequest) (*BasicsResponse, error) + // 心跳 + Pong(context.Context, *BasicsRequest) (*BasicsResponse, error) + mustEmbedUnimplementedBasicsServer() +} + +// UnimplementedBasicsServer must be embedded to have forward compatible implementations. +type UnimplementedBasicsServer struct { +} + +func (UnimplementedBasicsServer) Ping(context.Context, *BasicsRequest) (*BasicsResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method Ping not implemented") +} +func (UnimplementedBasicsServer) Pong(context.Context, *BasicsRequest) (*BasicsResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method Pong not implemented") +} +func (UnimplementedBasicsServer) mustEmbedUnimplementedBasicsServer() {} + +// UnsafeBasicsServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to BasicsServer will +// result in compilation errors. +type UnsafeBasicsServer interface { + mustEmbedUnimplementedBasicsServer() +} + +func RegisterBasicsServer(s grpc.ServiceRegistrar, srv BasicsServer) { + s.RegisterService(&Basics_ServiceDesc, srv) +} + +func _Basics_Ping_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(BasicsRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(BasicsServer).Ping(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/pb.Basics/Ping", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(BasicsServer).Ping(ctx, req.(*BasicsRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Basics_Pong_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(BasicsRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(BasicsServer).Pong(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/pb.Basics/Pong", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(BasicsServer).Pong(ctx, req.(*BasicsRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// Basics_ServiceDesc is the grpc.ServiceDesc for Basics service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Basics_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "pb.Basics", + HandlerType: (*BasicsServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Ping", + Handler: _Basics_Ping_Handler, + }, + { + MethodName: "Pong", + Handler: _Basics_Pong_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "pb/basics.proto", +} diff --git a/pb/pubsub.pb.go b/pb/pubsub.pb.go new file mode 100644 index 0000000..90412c5 --- /dev/null +++ b/pb/pubsub.pb.go @@ -0,0 +1,155 @@ +// 版本 + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.0 +// protoc v3.19.4 +// source: pb/pubsub.proto + +// 包名 + +package pb + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// 消息 +type String struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Value string `protobuf:"bytes,1,opt,name=value,proto3" json:"value,omitempty"` +} + +func (x *String) Reset() { + *x = String{} + if protoimpl.UnsafeEnabled { + mi := &file_pb_pubsub_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *String) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*String) ProtoMessage() {} + +func (x *String) ProtoReflect() protoreflect.Message { + mi := &file_pb_pubsub_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use String.ProtoReflect.Descriptor instead. +func (*String) Descriptor() ([]byte, []int) { + return file_pb_pubsub_proto_rawDescGZIP(), []int{0} +} + +func (x *String) GetValue() string { + if x != nil { + return x.Value + } + return "" +} + +var File_pb_pubsub_proto protoreflect.FileDescriptor + +var file_pb_pubsub_proto_rawDesc = []byte{ + 0x0a, 0x0f, 0x70, 0x62, 0x2f, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x12, 0x02, 0x70, 0x62, 0x22, 0x1e, 0x0a, 0x06, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x12, + 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, + 0x76, 0x61, 0x6c, 0x75, 0x65, 0x32, 0x52, 0x0a, 0x06, 0x50, 0x75, 0x62, 0x53, 0x75, 0x62, 0x12, + 0x21, 0x0a, 0x07, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x12, 0x0a, 0x2e, 0x70, 0x62, 0x2e, + 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x1a, 0x0a, 0x2e, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x72, 0x69, + 0x6e, 0x67, 0x12, 0x25, 0x0a, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, + 0x0a, 0x2e, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x1a, 0x0a, 0x2e, 0x70, 0x62, + 0x2e, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x30, 0x01, 0x42, 0x07, 0x5a, 0x05, 0x2e, 0x2e, 0x2f, + 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_pb_pubsub_proto_rawDescOnce sync.Once + file_pb_pubsub_proto_rawDescData = file_pb_pubsub_proto_rawDesc +) + +func file_pb_pubsub_proto_rawDescGZIP() []byte { + file_pb_pubsub_proto_rawDescOnce.Do(func() { + file_pb_pubsub_proto_rawDescData = protoimpl.X.CompressGZIP(file_pb_pubsub_proto_rawDescData) + }) + return file_pb_pubsub_proto_rawDescData +} + +var file_pb_pubsub_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_pb_pubsub_proto_goTypes = []interface{}{ + (*String)(nil), // 0: pb.String +} +var file_pb_pubsub_proto_depIdxs = []int32{ + 0, // 0: pb.PubSub.Publish:input_type -> pb.String + 0, // 1: pb.PubSub.Subscribe:input_type -> pb.String + 0, // 2: pb.PubSub.Publish:output_type -> pb.String + 0, // 3: pb.PubSub.Subscribe:output_type -> pb.String + 2, // [2:4] is the sub-list for method output_type + 0, // [0:2] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_pb_pubsub_proto_init() } +func file_pb_pubsub_proto_init() { + if File_pb_pubsub_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_pb_pubsub_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*String); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_pb_pubsub_proto_rawDesc, + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_pb_pubsub_proto_goTypes, + DependencyIndexes: file_pb_pubsub_proto_depIdxs, + MessageInfos: file_pb_pubsub_proto_msgTypes, + }.Build() + File_pb_pubsub_proto = out.File + file_pb_pubsub_proto_rawDesc = nil + file_pb_pubsub_proto_goTypes = nil + file_pb_pubsub_proto_depIdxs = nil +} diff --git a/pb/pubsub.proto b/pb/pubsub.proto new file mode 100644 index 0000000..f5d1aa1 --- /dev/null +++ b/pb/pubsub.proto @@ -0,0 +1,21 @@ +// 版本 +syntax = "proto3"; + +// 包名 +package pb; + +// 别名 +option go_package = "../pb"; + +// 定义服务 +service PubSub { + // [发布] 消息 + rpc Publish (String) returns (String); + // [订阅] 消息 + rpc Subscribe (String) returns (stream String); +} + +// 消息 +message String { + string value = 1; +} diff --git a/pb/pubsub.server.go b/pb/pubsub.server.go new file mode 100644 index 0000000..66ff01c --- /dev/null +++ b/pb/pubsub.server.go @@ -0,0 +1,54 @@ +package pb + +import ( + "context" + "go.dtapp.net/gojobs/pubsub" + "strings" + "time" +) + +type PubSubServerService struct { + pub *pubsub.Publisher + UnimplementedPubSubServer +} + +func NewPubSubServerService() *PubSubServerService { + return &PubSubServerService{ + // 新建一个Publisher对象 + pub: pubsub.NewPublisher(time.Millisecond*100, 10), + } +} + +// Publish 实现发布方法 +func (p *PubSubServerService) Publish(ctx context.Context, arg *String) (*String, error) { + // 发布消息 + p.pub.Publish(arg.GetValue()) + return &String{Value: arg.GetValue()}, nil +} + +// Subscribe 实现订阅方法 +func (p *PubSubServerService) Subscribe(arg *String, stream PubSub_SubscribeServer) error { + // SubscribeTopic 增加一个使用函数过滤器的订阅者 + // func(v interface{}) 定义函数过滤的规则 + // SubscribeTopic 返回一个chan interface{} + + ch := p.pub.SubscribeTopic(func(v interface{}) bool { + // 接收数据是string,并且key是以arg为前缀的 + if key, ok := v.(string); ok { + if strings.HasPrefix(key, arg.GetValue()) { + return true + } + } + return false + }) + + // 服务器遍历chan,并将其中信息发送给订阅客户端 + for v := range ch { + err := stream.Send(&String{Value: v.(string)}) + if err != nil { + return err + } + } + + return nil +} diff --git a/pb/pubsub_grpc.pb.go b/pb/pubsub_grpc.pb.go new file mode 100644 index 0000000..5fcdaec --- /dev/null +++ b/pb/pubsub_grpc.pb.go @@ -0,0 +1,173 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc v3.19.4 +// source: pb/pubsub.proto + +package pb + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// PubSubClient is the client API for PubSub service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type PubSubClient interface { + // [发布] 消息 + Publish(ctx context.Context, in *String, opts ...grpc.CallOption) (*String, error) + // [订阅] 消息 + Subscribe(ctx context.Context, in *String, opts ...grpc.CallOption) (PubSub_SubscribeClient, error) +} + +type pubSubClient struct { + cc grpc.ClientConnInterface +} + +func NewPubSubClient(cc grpc.ClientConnInterface) PubSubClient { + return &pubSubClient{cc} +} + +func (c *pubSubClient) Publish(ctx context.Context, in *String, opts ...grpc.CallOption) (*String, error) { + out := new(String) + err := c.cc.Invoke(ctx, "/pb.PubSub/Publish", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *pubSubClient) Subscribe(ctx context.Context, in *String, opts ...grpc.CallOption) (PubSub_SubscribeClient, error) { + stream, err := c.cc.NewStream(ctx, &PubSub_ServiceDesc.Streams[0], "/pb.PubSub/Subscribe", opts...) + if err != nil { + return nil, err + } + x := &pubSubSubscribeClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type PubSub_SubscribeClient interface { + Recv() (*String, error) + grpc.ClientStream +} + +type pubSubSubscribeClient struct { + grpc.ClientStream +} + +func (x *pubSubSubscribeClient) Recv() (*String, error) { + m := new(String) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// PubSubServer is the server API for PubSub service. +// All implementations must embed UnimplementedPubSubServer +// for forward compatibility +type PubSubServer interface { + // [发布] 消息 + Publish(context.Context, *String) (*String, error) + // [订阅] 消息 + Subscribe(*String, PubSub_SubscribeServer) error + mustEmbedUnimplementedPubSubServer() +} + +// UnimplementedPubSubServer must be embedded to have forward compatible implementations. +type UnimplementedPubSubServer struct { +} + +func (UnimplementedPubSubServer) Publish(context.Context, *String) (*String, error) { + return nil, status.Errorf(codes.Unimplemented, "method Publish not implemented") +} +func (UnimplementedPubSubServer) Subscribe(*String, PubSub_SubscribeServer) error { + return status.Errorf(codes.Unimplemented, "method Subscribe not implemented") +} +func (UnimplementedPubSubServer) mustEmbedUnimplementedPubSubServer() {} + +// UnsafePubSubServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to PubSubServer will +// result in compilation errors. +type UnsafePubSubServer interface { + mustEmbedUnimplementedPubSubServer() +} + +func RegisterPubSubServer(s grpc.ServiceRegistrar, srv PubSubServer) { + s.RegisterService(&PubSub_ServiceDesc, srv) +} + +func _PubSub_Publish_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(String) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(PubSubServer).Publish(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/pb.PubSub/Publish", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(PubSubServer).Publish(ctx, req.(*String)) + } + return interceptor(ctx, in, info, handler) +} + +func _PubSub_Subscribe_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(String) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(PubSubServer).Subscribe(m, &pubSubSubscribeServer{stream}) +} + +type PubSub_SubscribeServer interface { + Send(*String) error + grpc.ServerStream +} + +type pubSubSubscribeServer struct { + grpc.ServerStream +} + +func (x *pubSubSubscribeServer) Send(m *String) error { + return x.ServerStream.SendMsg(m) +} + +// PubSub_ServiceDesc is the grpc.ServiceDesc for PubSub service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var PubSub_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "pb.PubSub", + HandlerType: (*PubSubServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Publish", + Handler: _PubSub_Publish_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "Subscribe", + Handler: _PubSub_Subscribe_Handler, + ServerStreams: true, + }, + }, + Metadata: "pb/pubsub.proto", +} diff --git a/pb/task.pb.go b/pb/task.pb.go new file mode 100644 index 0000000..5388093 --- /dev/null +++ b/pb/task.pb.go @@ -0,0 +1,234 @@ +// 版本 + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.0 +// protoc v3.19.4 +// source: pb/task.proto + +// 包名 + +package pb + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// 请求消息 +type TaskRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Message string `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"` +} + +func (x *TaskRequest) Reset() { + *x = TaskRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_pb_task_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *TaskRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TaskRequest) ProtoMessage() {} + +func (x *TaskRequest) ProtoReflect() protoreflect.Message { + mi := &file_pb_task_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TaskRequest.ProtoReflect.Descriptor instead. +func (*TaskRequest) Descriptor() ([]byte, []int) { + return file_pb_task_proto_rawDescGZIP(), []int{0} +} + +func (x *TaskRequest) GetMessage() string { + if x != nil { + return x.Message + } + return "" +} + +// 响应消息 +type TaskResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Message string `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"` +} + +func (x *TaskResponse) Reset() { + *x = TaskResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_pb_task_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *TaskResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TaskResponse) ProtoMessage() {} + +func (x *TaskResponse) ProtoReflect() protoreflect.Message { + mi := &file_pb_task_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TaskResponse.ProtoReflect.Descriptor instead. +func (*TaskResponse) Descriptor() ([]byte, []int) { + return file_pb_task_proto_rawDescGZIP(), []int{1} +} + +func (x *TaskResponse) GetMessage() string { + if x != nil { + return x.Message + } + return "" +} + +var File_pb_task_proto protoreflect.FileDescriptor + +var file_pb_task_proto_rawDesc = []byte{ + 0x0a, 0x0d, 0x70, 0x62, 0x2f, 0x74, 0x61, 0x73, 0x6b, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, + 0x02, 0x70, 0x62, 0x22, 0x27, 0x0a, 0x0b, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x28, 0x0a, 0x0c, + 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, + 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x32, 0xfb, 0x01, 0x0a, 0x04, 0x54, 0x61, 0x73, 0x6b, 0x12, + 0x30, 0x0a, 0x09, 0x55, 0x6e, 0x61, 0x72, 0x79, 0x54, 0x61, 0x73, 0x6b, 0x12, 0x0f, 0x2e, 0x70, + 0x62, 0x2e, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x10, 0x2e, + 0x70, 0x62, 0x2e, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, + 0x00, 0x12, 0x3c, 0x0a, 0x13, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x53, 0x74, 0x72, 0x65, 0x61, + 0x6d, 0x69, 0x6e, 0x67, 0x54, 0x61, 0x73, 0x6b, 0x12, 0x0f, 0x2e, 0x70, 0x62, 0x2e, 0x54, 0x61, + 0x73, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x10, 0x2e, 0x70, 0x62, 0x2e, 0x54, + 0x61, 0x73, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, + 0x3c, 0x0a, 0x13, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, + 0x6e, 0x67, 0x54, 0x61, 0x73, 0x6b, 0x12, 0x0f, 0x2e, 0x70, 0x62, 0x2e, 0x54, 0x61, 0x73, 0x6b, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x10, 0x2e, 0x70, 0x62, 0x2e, 0x54, 0x61, 0x73, + 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x12, 0x45, 0x0a, + 0x1a, 0x42, 0x69, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x6c, 0x53, 0x74, + 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x54, 0x61, 0x73, 0x6b, 0x12, 0x0f, 0x2e, 0x70, 0x62, + 0x2e, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x10, 0x2e, 0x70, + 0x62, 0x2e, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, + 0x28, 0x01, 0x30, 0x01, 0x42, 0x07, 0x5a, 0x05, 0x2e, 0x2e, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_pb_task_proto_rawDescOnce sync.Once + file_pb_task_proto_rawDescData = file_pb_task_proto_rawDesc +) + +func file_pb_task_proto_rawDescGZIP() []byte { + file_pb_task_proto_rawDescOnce.Do(func() { + file_pb_task_proto_rawDescData = protoimpl.X.CompressGZIP(file_pb_task_proto_rawDescData) + }) + return file_pb_task_proto_rawDescData +} + +var file_pb_task_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_pb_task_proto_goTypes = []interface{}{ + (*TaskRequest)(nil), // 0: pb.TaskRequest + (*TaskResponse)(nil), // 1: pb.TaskResponse +} +var file_pb_task_proto_depIdxs = []int32{ + 0, // 0: pb.Task.UnaryTask:input_type -> pb.TaskRequest + 0, // 1: pb.Task.ServerStreamingTask:input_type -> pb.TaskRequest + 0, // 2: pb.Task.ClientStreamingTask:input_type -> pb.TaskRequest + 0, // 3: pb.Task.BidirectionalStreamingTask:input_type -> pb.TaskRequest + 1, // 4: pb.Task.UnaryTask:output_type -> pb.TaskResponse + 1, // 5: pb.Task.ServerStreamingTask:output_type -> pb.TaskResponse + 1, // 6: pb.Task.ClientStreamingTask:output_type -> pb.TaskResponse + 1, // 7: pb.Task.BidirectionalStreamingTask:output_type -> pb.TaskResponse + 4, // [4:8] is the sub-list for method output_type + 0, // [0:4] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_pb_task_proto_init() } +func file_pb_task_proto_init() { + if File_pb_task_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_pb_task_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*TaskRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pb_task_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*TaskResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_pb_task_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_pb_task_proto_goTypes, + DependencyIndexes: file_pb_task_proto_depIdxs, + MessageInfos: file_pb_task_proto_msgTypes, + }.Build() + File_pb_task_proto = out.File + file_pb_task_proto_rawDesc = nil + file_pb_task_proto_goTypes = nil + file_pb_task_proto_depIdxs = nil +} diff --git a/pb/task.proto b/pb/task.proto new file mode 100644 index 0000000..645059d --- /dev/null +++ b/pb/task.proto @@ -0,0 +1,30 @@ +// 版本 +syntax = "proto3"; + +// 包名 +package pb; + +// 别名 +option go_package = "../pb"; + +// 定义服务 +service Task{ + // 普通一元方法 + rpc UnaryTask(TaskRequest) returns (TaskResponse){}; + // 服务端推送流 + rpc ServerStreamingTask(TaskRequest) returns (stream TaskResponse){}; + // 客户端推送流 + rpc ClientStreamingTask(stream TaskRequest) returns (TaskResponse){}; + // 双向推送流 + rpc BidirectionalStreamingTask(stream TaskRequest) returns (stream TaskResponse){}; +} + +// 请求消息 +message TaskRequest { + string message = 1; +} + +// 响应消息 +message TaskResponse { + string message = 1; +} diff --git a/pb/task_grpc.pb.go b/pb/task_grpc.pb.go new file mode 100644 index 0000000..7e21fab --- /dev/null +++ b/pb/task_grpc.pb.go @@ -0,0 +1,315 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc v3.19.4 +// source: pb/task.proto + +package pb + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// TaskClient is the client API for Task service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type TaskClient interface { + // 普通一元方法 + UnaryTask(ctx context.Context, in *TaskRequest, opts ...grpc.CallOption) (*TaskResponse, error) + // 服务端推送流 + ServerStreamingTask(ctx context.Context, in *TaskRequest, opts ...grpc.CallOption) (Task_ServerStreamingTaskClient, error) + // 客户端推送流 + ClientStreamingTask(ctx context.Context, opts ...grpc.CallOption) (Task_ClientStreamingTaskClient, error) + // 双向推送流 + BidirectionalStreamingTask(ctx context.Context, opts ...grpc.CallOption) (Task_BidirectionalStreamingTaskClient, error) +} + +type taskClient struct { + cc grpc.ClientConnInterface +} + +func NewTaskClient(cc grpc.ClientConnInterface) TaskClient { + return &taskClient{cc} +} + +func (c *taskClient) UnaryTask(ctx context.Context, in *TaskRequest, opts ...grpc.CallOption) (*TaskResponse, error) { + out := new(TaskResponse) + err := c.cc.Invoke(ctx, "/pb.Task/UnaryTask", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *taskClient) ServerStreamingTask(ctx context.Context, in *TaskRequest, opts ...grpc.CallOption) (Task_ServerStreamingTaskClient, error) { + stream, err := c.cc.NewStream(ctx, &Task_ServiceDesc.Streams[0], "/pb.Task/ServerStreamingTask", opts...) + if err != nil { + return nil, err + } + x := &taskServerStreamingTaskClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Task_ServerStreamingTaskClient interface { + Recv() (*TaskResponse, error) + grpc.ClientStream +} + +type taskServerStreamingTaskClient struct { + grpc.ClientStream +} + +func (x *taskServerStreamingTaskClient) Recv() (*TaskResponse, error) { + m := new(TaskResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *taskClient) ClientStreamingTask(ctx context.Context, opts ...grpc.CallOption) (Task_ClientStreamingTaskClient, error) { + stream, err := c.cc.NewStream(ctx, &Task_ServiceDesc.Streams[1], "/pb.Task/ClientStreamingTask", opts...) + if err != nil { + return nil, err + } + x := &taskClientStreamingTaskClient{stream} + return x, nil +} + +type Task_ClientStreamingTaskClient interface { + Send(*TaskRequest) error + CloseAndRecv() (*TaskResponse, error) + grpc.ClientStream +} + +type taskClientStreamingTaskClient struct { + grpc.ClientStream +} + +func (x *taskClientStreamingTaskClient) Send(m *TaskRequest) error { + return x.ClientStream.SendMsg(m) +} + +func (x *taskClientStreamingTaskClient) CloseAndRecv() (*TaskResponse, error) { + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + m := new(TaskResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *taskClient) BidirectionalStreamingTask(ctx context.Context, opts ...grpc.CallOption) (Task_BidirectionalStreamingTaskClient, error) { + stream, err := c.cc.NewStream(ctx, &Task_ServiceDesc.Streams[2], "/pb.Task/BidirectionalStreamingTask", opts...) + if err != nil { + return nil, err + } + x := &taskBidirectionalStreamingTaskClient{stream} + return x, nil +} + +type Task_BidirectionalStreamingTaskClient interface { + Send(*TaskRequest) error + Recv() (*TaskResponse, error) + grpc.ClientStream +} + +type taskBidirectionalStreamingTaskClient struct { + grpc.ClientStream +} + +func (x *taskBidirectionalStreamingTaskClient) Send(m *TaskRequest) error { + return x.ClientStream.SendMsg(m) +} + +func (x *taskBidirectionalStreamingTaskClient) Recv() (*TaskResponse, error) { + m := new(TaskResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// TaskServer is the server API for Task service. +// All implementations must embed UnimplementedTaskServer +// for forward compatibility +type TaskServer interface { + // 普通一元方法 + UnaryTask(context.Context, *TaskRequest) (*TaskResponse, error) + // 服务端推送流 + ServerStreamingTask(*TaskRequest, Task_ServerStreamingTaskServer) error + // 客户端推送流 + ClientStreamingTask(Task_ClientStreamingTaskServer) error + // 双向推送流 + BidirectionalStreamingTask(Task_BidirectionalStreamingTaskServer) error + mustEmbedUnimplementedTaskServer() +} + +// UnimplementedTaskServer must be embedded to have forward compatible implementations. +type UnimplementedTaskServer struct { +} + +func (UnimplementedTaskServer) UnaryTask(context.Context, *TaskRequest) (*TaskResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method UnaryTask not implemented") +} +func (UnimplementedTaskServer) ServerStreamingTask(*TaskRequest, Task_ServerStreamingTaskServer) error { + return status.Errorf(codes.Unimplemented, "method ServerStreamingTask not implemented") +} +func (UnimplementedTaskServer) ClientStreamingTask(Task_ClientStreamingTaskServer) error { + return status.Errorf(codes.Unimplemented, "method ClientStreamingTask not implemented") +} +func (UnimplementedTaskServer) BidirectionalStreamingTask(Task_BidirectionalStreamingTaskServer) error { + return status.Errorf(codes.Unimplemented, "method BidirectionalStreamingTask not implemented") +} +func (UnimplementedTaskServer) mustEmbedUnimplementedTaskServer() {} + +// UnsafeTaskServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to TaskServer will +// result in compilation errors. +type UnsafeTaskServer interface { + mustEmbedUnimplementedTaskServer() +} + +func RegisterTaskServer(s grpc.ServiceRegistrar, srv TaskServer) { + s.RegisterService(&Task_ServiceDesc, srv) +} + +func _Task_UnaryTask_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(TaskRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(TaskServer).UnaryTask(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/pb.Task/UnaryTask", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(TaskServer).UnaryTask(ctx, req.(*TaskRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Task_ServerStreamingTask_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(TaskRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(TaskServer).ServerStreamingTask(m, &taskServerStreamingTaskServer{stream}) +} + +type Task_ServerStreamingTaskServer interface { + Send(*TaskResponse) error + grpc.ServerStream +} + +type taskServerStreamingTaskServer struct { + grpc.ServerStream +} + +func (x *taskServerStreamingTaskServer) Send(m *TaskResponse) error { + return x.ServerStream.SendMsg(m) +} + +func _Task_ClientStreamingTask_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(TaskServer).ClientStreamingTask(&taskClientStreamingTaskServer{stream}) +} + +type Task_ClientStreamingTaskServer interface { + SendAndClose(*TaskResponse) error + Recv() (*TaskRequest, error) + grpc.ServerStream +} + +type taskClientStreamingTaskServer struct { + grpc.ServerStream +} + +func (x *taskClientStreamingTaskServer) SendAndClose(m *TaskResponse) error { + return x.ServerStream.SendMsg(m) +} + +func (x *taskClientStreamingTaskServer) Recv() (*TaskRequest, error) { + m := new(TaskRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func _Task_BidirectionalStreamingTask_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(TaskServer).BidirectionalStreamingTask(&taskBidirectionalStreamingTaskServer{stream}) +} + +type Task_BidirectionalStreamingTaskServer interface { + Send(*TaskResponse) error + Recv() (*TaskRequest, error) + grpc.ServerStream +} + +type taskBidirectionalStreamingTaskServer struct { + grpc.ServerStream +} + +func (x *taskBidirectionalStreamingTaskServer) Send(m *TaskResponse) error { + return x.ServerStream.SendMsg(m) +} + +func (x *taskBidirectionalStreamingTaskServer) Recv() (*TaskRequest, error) { + m := new(TaskRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// Task_ServiceDesc is the grpc.ServiceDesc for Task service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Task_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "pb.Task", + HandlerType: (*TaskServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "UnaryTask", + Handler: _Task_UnaryTask_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "ServerStreamingTask", + Handler: _Task_ServerStreamingTask_Handler, + ServerStreams: true, + }, + { + StreamName: "ClientStreamingTask", + Handler: _Task_ClientStreamingTask_Handler, + ClientStreams: true, + }, + { + StreamName: "BidirectionalStreamingTask", + Handler: _Task_BidirectionalStreamingTask_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, + Metadata: "pb/task.proto", +} diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go new file mode 100644 index 0000000..ef817b9 --- /dev/null +++ b/pubsub/pubsub.go @@ -0,0 +1,125 @@ +package pubsub + +import ( + "sync" + "time" +) + +// 等待组放在共享内存池中,减少GC +var wgPool = sync.Pool{New: func() interface{} { return new(sync.WaitGroup) }} + +// NewPublisher +// 第一个参数控制发布时最大阻塞时间 +// 第二个参数是缓冲区大小,控制每个订阅者的chan缓冲区大小 +func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher { + return &Publisher{ + buffer: buffer, + timeout: publishTimeout, + subscribers: make(map[subscriber]topicFunc), + } +} + +type subscriber chan interface{} +type topicFunc func(v interface{}) bool + +type Publisher struct { + m sync.RWMutex // 控制订阅者map并发读写安全 + buffer int // 每个订阅者chan缓冲区大小 + timeout time.Duration // 发布阻塞超时时间 + subscribers map[subscriber]topicFunc +} + +// Len 返回订阅者数量 +func (p *Publisher) Len() int { + p.m.RLock() + i := len(p.subscribers) + p.m.RUnlock() + return i +} + +// Subscribe 无Topic订阅 +func (p *Publisher) Subscribe() chan interface{} { + return p.SubscribeTopic(nil) +} + +// SubscribeTopic 通过Topic订阅 +func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} { + ch := make(chan interface{}, p.buffer) + p.m.Lock() + p.subscribers[ch] = topic + p.m.Unlock() + return ch +} + +// SubscribeTopicWithBuffer 通过自定义chan缓冲区大小定义新的订阅者 +func (p *Publisher) SubscribeTopicWithBuffer(topic topicFunc, buffer int) chan interface{} { + ch := make(chan interface{}, buffer) + p.m.Lock() + p.subscribers[ch] = topic + p.m.Unlock() + return ch +} + +// Evict 移除某个订阅者 +func (p *Publisher) Evict(sub chan interface{}) { + p.m.Lock() + _, exists := p.subscribers[sub] + if exists { + delete(p.subscribers, sub) + close(sub) + } + p.m.Unlock() +} + +// Publish 发布消息 +func (p *Publisher) Publish(v interface{}) { + p.m.RLock() + if len(p.subscribers) == 0 { + p.m.RUnlock() + return + } + + wg := wgPool.Get().(*sync.WaitGroup) + for sub, topic := range p.subscribers { + wg.Add(1) + go p.sendTopic(sub, topic, v, wg) + } + wg.Wait() + wgPool.Put(wg) + p.m.RUnlock() +} + +// Close 关闭服务 +func (p *Publisher) Close() { + p.m.Lock() + for sub := range p.subscribers { + delete(p.subscribers, sub) + close(sub) + } + p.m.Unlock() +} + +// 真正发布消息的逻辑,通过Timer,根据传入的timeout控制每次发布消息最大阻塞时长 +func (p *Publisher) sendTopic(sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup) { + defer wg.Done() + if topic != nil && !topic(v) { + return + } + + // 如果接收器不可用,请在选择“不阻止”下发送 + if p.timeout > 0 { + timeout := time.NewTimer(p.timeout) + defer timeout.Stop() + + select { + case sub <- v: + case <-timeout.C: + } + return + } + + select { + case sub <- v: + default: + } +} diff --git a/server.go b/server.go new file mode 100644 index 0000000..28987ce --- /dev/null +++ b/server.go @@ -0,0 +1,68 @@ +package goip + +import ( + "errors" + "go.dtapp.net/gojobs/pb" + "go.dtapp.net/gojobs/pubsub" + "google.golang.org/grpc" + "log" + "net" + "time" +) + +// ServerConfig 服务配置 +type ServerConfig struct { + PublishTimeout time.Duration // 控制发布时最大阻塞时间 + PubBuffer int // 缓冲区大小,控制每个订阅者的chan缓冲区大小 + Address string // 服务端口 0.0.0.0:8888 +} + +// Server 服务 +type Server struct { + ServerConfig // 配置 + Pub *pubsub.Publisher // 订阅 + Conn *grpc.Server // 链接信息 +} + +// NewServer 创建服务和注册 +func NewServer(config *ServerConfig) *Server { + + if config.Address == "" { + panic("请填写服务端口") + } + + s := &Server{} + + s.PublishTimeout = config.PublishTimeout + s.PubBuffer = config.PubBuffer + s.Address = config.Address + + s.Pub = pubsub.NewPublisher(config.PublishTimeout, config.PubBuffer) + + // 创建gRPC服务器 + s.Conn = grpc.NewServer() + + // 注册 + pb.RegisterPubSubServer(s.Conn, pb.NewPubSubServerService()) + + return s +} + +// StartUp 启动 +func (s *Server) StartUp() error { + + // 监听本地端口 + lis, err := net.Listen("tcp", s.Address) + if err != nil { + return errors.New("创建监听失败:" + err.Error()) + } + log.Printf("正在监听的地址:%v", lis.Addr()) + + // 启动grpc + err = s.Conn.Serve(lis) + if err != nil { + return errors.New("创建服务失败:" + err.Error()) + } + + return nil +} diff --git a/version.go b/version.go new file mode 100644 index 0000000..f86496d --- /dev/null +++ b/version.go @@ -0,0 +1,3 @@ +package goip + +const Version = "1.0.0" diff --git a/version_test.go b/version_test.go new file mode 100644 index 0000000..96bb7f0 --- /dev/null +++ b/version_test.go @@ -0,0 +1,7 @@ +package goip + +import "testing" + +func TestVersion(t *testing.T) { + t.Log(Version) +} diff --git a/worker.go b/worker.go new file mode 100644 index 0000000..cbb4c71 --- /dev/null +++ b/worker.go @@ -0,0 +1,43 @@ +package goip + +import ( + "go.dtapp.net/gojobs/pb" + "google.golang.org/grpc" +) + +// WorkerConfig 工作配置 +type WorkerConfig struct { + Address string // 服务端口 127.0.0.1:8888 +} + +// Worker 工作 +type Worker struct { + WorkerConfig // 配置 + Pub pb.PubSubClient // 订阅 + Conn *grpc.ClientConn // 链接信息 +} + +// NewWorker 创建工作 +func NewWorker(config *WorkerConfig) *Worker { + + if config.Address == "" { + panic("请填写服务端口") + } + + w := &Worker{} + + w.Address = config.Address + + var err error + + // 建立连接 获取client + w.Conn, err = grpc.Dial(w.Address, grpc.WithInsecure()) + if err != nil { + panic("连接失败: " + err.Error()) + } + + // 新建一个客户端 + w.Pub = pb.NewPubSubClient(w.Conn) + + return w +}