- init
continuous-integration/drone/push Build is passing Details

master
李光春 2 years ago
commit ca4b274a9c

@ -0,0 +1,38 @@
name: "CodeQL"
on:
push:
branches: [ master ]
pull_request:
branches: [ master ]
schedule:
- cron: '36 15 * * 2'
jobs:
analyze:
name: Analyze
runs-on: ubuntu-latest
permissions:
actions: read
contents: read
security-events: write
strategy:
fail-fast: false
matrix:
language: [ 'go' ]
steps:
- name: Checkout repository
uses: actions/checkout@v3
- name: Initialize CodeQL
uses: github/codeql-action/init@v2
with:
languages: ${{ matrix.language }}
- name: Autobuild
uses: github/codeql-action/autobuild@v2
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v2

@ -0,0 +1,17 @@
name: Go
on:
push:
branches: [ master ]
pull_request:
branches: [ master ]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.18
- name: Test
run: go test -v ./...

@ -0,0 +1,27 @@
version: '1.0'
name: go
displayName: go
triggers:
trigger: auto
pr:
branches:
include:
- master
stages:
- name: compile
displayName: 编译
strategy: naturally
trigger: auto
steps:
- step: build@golang
name: Test
displayName: Golang 测试
golangVersion: '1.8'
commands:
- go env -w GO111MODULE=on
- go env -w GOPROXY=https://goproxy.cn,direct
- go test -v ./...
notify: []
permissions:
- role: admin
members: []

@ -0,0 +1,40 @@
# Use the latest 2.1 version of CircleCI pipeline process engine.
# See: https://circleci.com/docs/2.0/configuration-reference
version: 2.1
# Define a job to be invoked later in a workflow.
# See: https://circleci.com/docs/2.0/configuration-reference/#jobs
jobs:
build:
working_directory: ~/repo
# Specify the execution environment. You can specify an image from Dockerhub or use one of our Convenience Images from CircleCI's Developer Hub.
# See: https://circleci.com/docs/2.0/configuration-reference/#docker-machine-macos-windows-executor
docker:
- image: circleci/golang:1.18
# Add steps to the job
# See: https://circleci.com/docs/2.0/configuration-reference/#steps
steps:
- checkout
- restore_cache:
keys:
- go-mod-v4-{{ checksum "go.sum" }}
- run:
name: Install Dependencies
command: go mod download
- save_cache:
key: go-mod-v4-{{ checksum "go.sum" }}
paths:
- "/go/pkg/mod"
- run:
name: Run tests
command: |
mkdir -p /tmp/test-reports
gotestsum --junitfile /tmp/test-reports/unit-tests.xml
- store_test_results:
path: /tmp/test-reports
# Invoke jobs via workflows
# See: https://circleci.com/docs/2.0/configuration-reference/#workflows
workflows:
sample: # This is the name of the workflow, feel free to change it to better match your workflow.
# Inside the workflow, you define the jobs you want to run.
jobs:
- build

@ -0,0 +1,11 @@
kind: pipeline
type: docker
name: clone
steps:
- name: Test
image: golang:1.18
commands:
- go env -w GO111MODULE=on
- go env -w GOPROXY=https://goproxy.cn,direct
- go test -v ./...

9
.gitignore vendored

@ -0,0 +1,9 @@
.env
.git
.svn
.idea
.vscode
*.log
gomod.sh
/vendor/
grpc_build.sh

@ -0,0 +1,25 @@
<h1>
<a href="https://www.dtapp.net/">Golang Jobs</a>
</h1>
📦 Golang Jobs
[comment]: <> (go)
[![godoc](https://pkg.go.dev/badge/go.dtapp.net/gojobs?status.svg)](https://pkg.go.dev/go.dtapp.net/gojobs)
[![goproxy.cn](https://goproxy.cn/stats/go.dtapp.net/gojobs/badges/download-count.svg)](https://goproxy.cn/stats/go.dtapp.net/gojobs)
[![goreportcard.com](https://goreportcard.com/badge/go.dtapp.net/gojobs )](https://goreportcard.com/report/go.dtapp.net/gojobs)
[![deps.dev](https://img.shields.io/badge/deps-go-red.svg)](https://deps.dev/go/go.dtapp.net/gojobs)
#### 安装使用
```go
go get -v -u go.dtapp.net/gojobs
```
#### 导入
```go
import (
"go.dtapp.net/gojobs"
)
```

@ -0,0 +1,43 @@
package goip
import (
"go.dtapp.net/gojobs/pb"
"google.golang.org/grpc"
)
// CronConfig 定时任务配置
type CronConfig struct {
Address string // 服务端口 127.0.0.1:8888
}
// Cron 定时任务
type Cron struct {
CronConfig // 配置
Pub pb.PubSubClient // 订阅
Conn *grpc.ClientConn // 链接信息
}
// NewCron 创建定时任务
func NewCron(config *CronConfig) *Cron {
if config.Address == "" {
panic("请填写服务端口")
}
c := &Cron{}
c.Address = config.Address
var err error
// 建立连接 获取client
c.Conn, err = grpc.Dial(c.Address, grpc.WithInsecure())
if err != nil {
panic("连接失败: " + err.Error())
}
// 新建一个客户端
c.Pub = pb.NewPubSubClient(c.Conn)
return c
}

@ -0,0 +1,16 @@
module go.dtapp.net/gojobs
go 1.18
require (
google.golang.org/grpc v1.46.2
google.golang.org/protobuf v1.28.0
)
require (
github.com/golang/protobuf v1.5.2 // indirect
golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 // indirect
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
golang.org/x/text v0.3.7 // indirect
google.golang.org/genproto v0.0.0-20220519153652-3a47de7e79bd // indirect
)

139
go.sum

@ -0,0 +1,139 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI=
github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 h1:NWy5+hlRbC7HK+PmcXVUmW1IMyFce7to56IUvhUFm7Y=
golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
google.golang.org/genproto v0.0.0-20220519153652-3a47de7e79bd h1:e0TwkXOdbnH/1x5rc5MZ/VYyiZ4v+RdVfrGMqEwT68I=
google.golang.org/genproto v0.0.0-20220519153652-3a47de7e79bd/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0=
google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.46.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
google.golang.org/grpc v1.46.2 h1:u+MLGgVf7vRdjEYZ8wDFhAVNmhkbJ5hmrA1LMWK1CAQ=
google.golang.org/grpc v1.46.2/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw=
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

@ -0,0 +1,5 @@
#protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative --grpc-gateway_out . --grpc-gateway_opt paths=source_relative ./pb/basics.proto
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative ./pb/basics.proto
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative ./pb/task.proto
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative ./pb/pubsub.proto

@ -0,0 +1,221 @@
// 版本
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.28.0
// protoc v3.19.4
// source: pb/basics.proto
// 包名
package pb
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
// 请求消息
type BasicsRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Message string `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"`
}
func (x *BasicsRequest) Reset() {
*x = BasicsRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_pb_basics_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *BasicsRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*BasicsRequest) ProtoMessage() {}
func (x *BasicsRequest) ProtoReflect() protoreflect.Message {
mi := &file_pb_basics_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use BasicsRequest.ProtoReflect.Descriptor instead.
func (*BasicsRequest) Descriptor() ([]byte, []int) {
return file_pb_basics_proto_rawDescGZIP(), []int{0}
}
func (x *BasicsRequest) GetMessage() string {
if x != nil {
return x.Message
}
return ""
}
// 响应消息
type BasicsResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Message string `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"`
}
func (x *BasicsResponse) Reset() {
*x = BasicsResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_pb_basics_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *BasicsResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*BasicsResponse) ProtoMessage() {}
func (x *BasicsResponse) ProtoReflect() protoreflect.Message {
mi := &file_pb_basics_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use BasicsResponse.ProtoReflect.Descriptor instead.
func (*BasicsResponse) Descriptor() ([]byte, []int) {
return file_pb_basics_proto_rawDescGZIP(), []int{1}
}
func (x *BasicsResponse) GetMessage() string {
if x != nil {
return x.Message
}
return ""
}
var File_pb_basics_proto protoreflect.FileDescriptor
var file_pb_basics_proto_rawDesc = []byte{
0x0a, 0x0f, 0x70, 0x62, 0x2f, 0x62, 0x61, 0x73, 0x69, 0x63, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x12, 0x02, 0x70, 0x62, 0x22, 0x29, 0x0a, 0x0d, 0x42, 0x61, 0x73, 0x69, 0x63, 0x73, 0x52,
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67,
0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65,
0x22, 0x2a, 0x0a, 0x0e, 0x42, 0x61, 0x73, 0x69, 0x63, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20,
0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x32, 0x6a, 0x0a, 0x06,
0x42, 0x61, 0x73, 0x69, 0x63, 0x73, 0x12, 0x2f, 0x0a, 0x04, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x11,
0x2e, 0x70, 0x62, 0x2e, 0x42, 0x61, 0x73, 0x69, 0x63, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x1a, 0x12, 0x2e, 0x70, 0x62, 0x2e, 0x42, 0x61, 0x73, 0x69, 0x63, 0x73, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x2f, 0x0a, 0x04, 0x50, 0x6f, 0x6e, 0x67, 0x12,
0x11, 0x2e, 0x70, 0x62, 0x2e, 0x42, 0x61, 0x73, 0x69, 0x63, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65,
0x73, 0x74, 0x1a, 0x12, 0x2e, 0x70, 0x62, 0x2e, 0x42, 0x61, 0x73, 0x69, 0x63, 0x73, 0x52, 0x65,
0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x07, 0x5a, 0x05, 0x2e, 0x2e, 0x2f, 0x70,
0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_pb_basics_proto_rawDescOnce sync.Once
file_pb_basics_proto_rawDescData = file_pb_basics_proto_rawDesc
)
func file_pb_basics_proto_rawDescGZIP() []byte {
file_pb_basics_proto_rawDescOnce.Do(func() {
file_pb_basics_proto_rawDescData = protoimpl.X.CompressGZIP(file_pb_basics_proto_rawDescData)
})
return file_pb_basics_proto_rawDescData
}
var file_pb_basics_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_pb_basics_proto_goTypes = []interface{}{
(*BasicsRequest)(nil), // 0: pb.BasicsRequest
(*BasicsResponse)(nil), // 1: pb.BasicsResponse
}
var file_pb_basics_proto_depIdxs = []int32{
0, // 0: pb.Basics.Ping:input_type -> pb.BasicsRequest
0, // 1: pb.Basics.Pong:input_type -> pb.BasicsRequest
1, // 2: pb.Basics.Ping:output_type -> pb.BasicsResponse
1, // 3: pb.Basics.Pong:output_type -> pb.BasicsResponse
2, // [2:4] is the sub-list for method output_type
0, // [0:2] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_pb_basics_proto_init() }
func file_pb_basics_proto_init() {
if File_pb_basics_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_pb_basics_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*BasicsRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_pb_basics_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*BasicsResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_pb_basics_proto_rawDesc,
NumEnums: 0,
NumMessages: 2,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_pb_basics_proto_goTypes,
DependencyIndexes: file_pb_basics_proto_depIdxs,
MessageInfos: file_pb_basics_proto_msgTypes,
}.Build()
File_pb_basics_proto = out.File
file_pb_basics_proto_rawDesc = nil
file_pb_basics_proto_goTypes = nil
file_pb_basics_proto_depIdxs = nil
}

@ -0,0 +1,25 @@
//
syntax = "proto3";
//
package pb;
//
option go_package = "../pb";
//
service Basics{
//
rpc Ping(BasicsRequest) returns (BasicsResponse){};
//
rpc Pong(BasicsRequest) returns (BasicsResponse){};
}
//
message BasicsRequest {
string message = 1;
}
//
message BasicsResponse {
string message = 1;
}

@ -0,0 +1,145 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.19.4
// source: pb/basics.proto
package pb
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// BasicsClient is the client API for Basics service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type BasicsClient interface {
// 心跳
Ping(ctx context.Context, in *BasicsRequest, opts ...grpc.CallOption) (*BasicsResponse, error)
// 心跳
Pong(ctx context.Context, in *BasicsRequest, opts ...grpc.CallOption) (*BasicsResponse, error)
}
type basicsClient struct {
cc grpc.ClientConnInterface
}
func NewBasicsClient(cc grpc.ClientConnInterface) BasicsClient {
return &basicsClient{cc}
}
func (c *basicsClient) Ping(ctx context.Context, in *BasicsRequest, opts ...grpc.CallOption) (*BasicsResponse, error) {
out := new(BasicsResponse)
err := c.cc.Invoke(ctx, "/pb.Basics/Ping", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *basicsClient) Pong(ctx context.Context, in *BasicsRequest, opts ...grpc.CallOption) (*BasicsResponse, error) {
out := new(BasicsResponse)
err := c.cc.Invoke(ctx, "/pb.Basics/Pong", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// BasicsServer is the server API for Basics service.
// All implementations must embed UnimplementedBasicsServer
// for forward compatibility
type BasicsServer interface {
// 心跳
Ping(context.Context, *BasicsRequest) (*BasicsResponse, error)
// 心跳
Pong(context.Context, *BasicsRequest) (*BasicsResponse, error)
mustEmbedUnimplementedBasicsServer()
}
// UnimplementedBasicsServer must be embedded to have forward compatible implementations.
type UnimplementedBasicsServer struct {
}
func (UnimplementedBasicsServer) Ping(context.Context, *BasicsRequest) (*BasicsResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Ping not implemented")
}
func (UnimplementedBasicsServer) Pong(context.Context, *BasicsRequest) (*BasicsResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Pong not implemented")
}
func (UnimplementedBasicsServer) mustEmbedUnimplementedBasicsServer() {}
// UnsafeBasicsServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to BasicsServer will
// result in compilation errors.
type UnsafeBasicsServer interface {
mustEmbedUnimplementedBasicsServer()
}
func RegisterBasicsServer(s grpc.ServiceRegistrar, srv BasicsServer) {
s.RegisterService(&Basics_ServiceDesc, srv)
}
func _Basics_Ping_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(BasicsRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(BasicsServer).Ping(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/pb.Basics/Ping",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(BasicsServer).Ping(ctx, req.(*BasicsRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Basics_Pong_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(BasicsRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(BasicsServer).Pong(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/pb.Basics/Pong",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(BasicsServer).Pong(ctx, req.(*BasicsRequest))
}
return interceptor(ctx, in, info, handler)
}
// Basics_ServiceDesc is the grpc.ServiceDesc for Basics service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var Basics_ServiceDesc = grpc.ServiceDesc{
ServiceName: "pb.Basics",
HandlerType: (*BasicsServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Ping",
Handler: _Basics_Ping_Handler,
},
{
MethodName: "Pong",
Handler: _Basics_Pong_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "pb/basics.proto",
}

@ -0,0 +1,155 @@
// 版本
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.28.0
// protoc v3.19.4
// source: pb/pubsub.proto
// 包名
package pb
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
// 消息
type String struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Value string `protobuf:"bytes,1,opt,name=value,proto3" json:"value,omitempty"`
}
func (x *String) Reset() {
*x = String{}
if protoimpl.UnsafeEnabled {
mi := &file_pb_pubsub_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *String) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*String) ProtoMessage() {}
func (x *String) ProtoReflect() protoreflect.Message {
mi := &file_pb_pubsub_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use String.ProtoReflect.Descriptor instead.
func (*String) Descriptor() ([]byte, []int) {
return file_pb_pubsub_proto_rawDescGZIP(), []int{0}
}
func (x *String) GetValue() string {
if x != nil {
return x.Value
}
return ""
}
var File_pb_pubsub_proto protoreflect.FileDescriptor
var file_pb_pubsub_proto_rawDesc = []byte{
0x0a, 0x0f, 0x70, 0x62, 0x2f, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x2e, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x12, 0x02, 0x70, 0x62, 0x22, 0x1e, 0x0a, 0x06, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x12,
0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05,
0x76, 0x61, 0x6c, 0x75, 0x65, 0x32, 0x52, 0x0a, 0x06, 0x50, 0x75, 0x62, 0x53, 0x75, 0x62, 0x12,
0x21, 0x0a, 0x07, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x12, 0x0a, 0x2e, 0x70, 0x62, 0x2e,
0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x1a, 0x0a, 0x2e, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x72, 0x69,
0x6e, 0x67, 0x12, 0x25, 0x0a, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12,
0x0a, 0x2e, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x1a, 0x0a, 0x2e, 0x70, 0x62,
0x2e, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x30, 0x01, 0x42, 0x07, 0x5a, 0x05, 0x2e, 0x2e, 0x2f,
0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_pb_pubsub_proto_rawDescOnce sync.Once
file_pb_pubsub_proto_rawDescData = file_pb_pubsub_proto_rawDesc
)
func file_pb_pubsub_proto_rawDescGZIP() []byte {
file_pb_pubsub_proto_rawDescOnce.Do(func() {
file_pb_pubsub_proto_rawDescData = protoimpl.X.CompressGZIP(file_pb_pubsub_proto_rawDescData)
})
return file_pb_pubsub_proto_rawDescData
}
var file_pb_pubsub_proto_msgTypes = make([]protoimpl.MessageInfo, 1)
var file_pb_pubsub_proto_goTypes = []interface{}{
(*String)(nil), // 0: pb.String
}
var file_pb_pubsub_proto_depIdxs = []int32{
0, // 0: pb.PubSub.Publish:input_type -> pb.String
0, // 1: pb.PubSub.Subscribe:input_type -> pb.String
0, // 2: pb.PubSub.Publish:output_type -> pb.String
0, // 3: pb.PubSub.Subscribe:output_type -> pb.String
2, // [2:4] is the sub-list for method output_type
0, // [0:2] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_pb_pubsub_proto_init() }
func file_pb_pubsub_proto_init() {
if File_pb_pubsub_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_pb_pubsub_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*String); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_pb_pubsub_proto_rawDesc,
NumEnums: 0,
NumMessages: 1,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_pb_pubsub_proto_goTypes,
DependencyIndexes: file_pb_pubsub_proto_depIdxs,
MessageInfos: file_pb_pubsub_proto_msgTypes,
}.Build()
File_pb_pubsub_proto = out.File
file_pb_pubsub_proto_rawDesc = nil
file_pb_pubsub_proto_goTypes = nil
file_pb_pubsub_proto_depIdxs = nil
}

@ -0,0 +1,21 @@
//
syntax = "proto3";
//
package pb;
//
option go_package = "../pb";
//
service PubSub {
// []
rpc Publish (String) returns (String);
// []
rpc Subscribe (String) returns (stream String);
}
//
message String {
string value = 1;
}

@ -0,0 +1,54 @@
package pb
import (
"context"
"go.dtapp.net/gojobs/pubsub"
"strings"
"time"
)
type PubSubServerService struct {
pub *pubsub.Publisher
UnimplementedPubSubServer
}
func NewPubSubServerService() *PubSubServerService {
return &PubSubServerService{
// 新建一个Publisher对象
pub: pubsub.NewPublisher(time.Millisecond*100, 10),
}
}
// Publish 实现发布方法
func (p *PubSubServerService) Publish(ctx context.Context, arg *String) (*String, error) {
// 发布消息
p.pub.Publish(arg.GetValue())
return &String{Value: arg.GetValue()}, nil
}
// Subscribe 实现订阅方法
func (p *PubSubServerService) Subscribe(arg *String, stream PubSub_SubscribeServer) error {
// SubscribeTopic 增加一个使用函数过滤器的订阅者
// func(v interface{}) 定义函数过滤的规则
// SubscribeTopic 返回一个chan interface{}
ch := p.pub.SubscribeTopic(func(v interface{}) bool {
// 接收数据是string并且key是以arg为前缀的
if key, ok := v.(string); ok {
if strings.HasPrefix(key, arg.GetValue()) {
return true
}
}
return false
})
// 服务器遍历chan并将其中信息发送给订阅客户端
for v := range ch {
err := stream.Send(&String{Value: v.(string)})
if err != nil {
return err
}
}
return nil
}

@ -0,0 +1,173 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.19.4
// source: pb/pubsub.proto
package pb
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// PubSubClient is the client API for PubSub service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type PubSubClient interface {
// [发布] 消息
Publish(ctx context.Context, in *String, opts ...grpc.CallOption) (*String, error)
// [订阅] 消息
Subscribe(ctx context.Context, in *String, opts ...grpc.CallOption) (PubSub_SubscribeClient, error)
}
type pubSubClient struct {
cc grpc.ClientConnInterface
}
func NewPubSubClient(cc grpc.ClientConnInterface) PubSubClient {
return &pubSubClient{cc}
}
func (c *pubSubClient) Publish(ctx context.Context, in *String, opts ...grpc.CallOption) (*String, error) {
out := new(String)
err := c.cc.Invoke(ctx, "/pb.PubSub/Publish", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *pubSubClient) Subscribe(ctx context.Context, in *String, opts ...grpc.CallOption) (PubSub_SubscribeClient, error) {
stream, err := c.cc.NewStream(ctx, &PubSub_ServiceDesc.Streams[0], "/pb.PubSub/Subscribe", opts...)
if err != nil {
return nil, err
}
x := &pubSubSubscribeClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type PubSub_SubscribeClient interface {
Recv() (*String, error)
grpc.ClientStream
}
type pubSubSubscribeClient struct {
grpc.ClientStream
}
func (x *pubSubSubscribeClient) Recv() (*String, error) {
m := new(String)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// PubSubServer is the server API for PubSub service.
// All implementations must embed UnimplementedPubSubServer
// for forward compatibility
type PubSubServer interface {
// [发布] 消息
Publish(context.Context, *String) (*String, error)
// [订阅] 消息
Subscribe(*String, PubSub_SubscribeServer) error
mustEmbedUnimplementedPubSubServer()
}
// UnimplementedPubSubServer must be embedded to have forward compatible implementations.
type UnimplementedPubSubServer struct {
}
func (UnimplementedPubSubServer) Publish(context.Context, *String) (*String, error) {
return nil, status.Errorf(codes.Unimplemented, "method Publish not implemented")
}
func (UnimplementedPubSubServer) Subscribe(*String, PubSub_SubscribeServer) error {
return status.Errorf(codes.Unimplemented, "method Subscribe not implemented")
}
func (UnimplementedPubSubServer) mustEmbedUnimplementedPubSubServer() {}
// UnsafePubSubServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to PubSubServer will
// result in compilation errors.
type UnsafePubSubServer interface {
mustEmbedUnimplementedPubSubServer()
}
func RegisterPubSubServer(s grpc.ServiceRegistrar, srv PubSubServer) {
s.RegisterService(&PubSub_ServiceDesc, srv)
}
func _PubSub_Publish_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(String)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(PubSubServer).Publish(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/pb.PubSub/Publish",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(PubSubServer).Publish(ctx, req.(*String))
}
return interceptor(ctx, in, info, handler)
}
func _PubSub_Subscribe_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(String)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(PubSubServer).Subscribe(m, &pubSubSubscribeServer{stream})
}
type PubSub_SubscribeServer interface {
Send(*String) error
grpc.ServerStream
}
type pubSubSubscribeServer struct {
grpc.ServerStream
}
func (x *pubSubSubscribeServer) Send(m *String) error {
return x.ServerStream.SendMsg(m)
}
// PubSub_ServiceDesc is the grpc.ServiceDesc for PubSub service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var PubSub_ServiceDesc = grpc.ServiceDesc{
ServiceName: "pb.PubSub",
HandlerType: (*PubSubServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Publish",
Handler: _PubSub_Publish_Handler,
},
},
Streams: []grpc.StreamDesc{
{
StreamName: "Subscribe",
Handler: _PubSub_Subscribe_Handler,
ServerStreams: true,
},
},
Metadata: "pb/pubsub.proto",
}

@ -0,0 +1,234 @@
// 版本
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.28.0
// protoc v3.19.4
// source: pb/task.proto
// 包名
package pb
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
// 请求消息
type TaskRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Message string `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"`
}
func (x *TaskRequest) Reset() {
*x = TaskRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_pb_task_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *TaskRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*TaskRequest) ProtoMessage() {}
func (x *TaskRequest) ProtoReflect() protoreflect.Message {
mi := &file_pb_task_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use TaskRequest.ProtoReflect.Descriptor instead.
func (*TaskRequest) Descriptor() ([]byte, []int) {
return file_pb_task_proto_rawDescGZIP(), []int{0}
}
func (x *TaskRequest) GetMessage() string {
if x != nil {
return x.Message
}
return ""
}
// 响应消息
type TaskResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Message string `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"`
}
func (x *TaskResponse) Reset() {
*x = TaskResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_pb_task_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *TaskResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*TaskResponse) ProtoMessage() {}
func (x *TaskResponse) ProtoReflect() protoreflect.Message {
mi := &file_pb_task_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use TaskResponse.ProtoReflect.Descriptor instead.
func (*TaskResponse) Descriptor() ([]byte, []int) {
return file_pb_task_proto_rawDescGZIP(), []int{1}
}
func (x *TaskResponse) GetMessage() string {
if x != nil {
return x.Message
}
return ""
}
var File_pb_task_proto protoreflect.FileDescriptor
var file_pb_task_proto_rawDesc = []byte{
0x0a, 0x0d, 0x70, 0x62, 0x2f, 0x74, 0x61, 0x73, 0x6b, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12,
0x02, 0x70, 0x62, 0x22, 0x27, 0x0a, 0x0b, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65,
0x73, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20,
0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x28, 0x0a, 0x0c,
0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07,
0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d,
0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x32, 0xfb, 0x01, 0x0a, 0x04, 0x54, 0x61, 0x73, 0x6b, 0x12,
0x30, 0x0a, 0x09, 0x55, 0x6e, 0x61, 0x72, 0x79, 0x54, 0x61, 0x73, 0x6b, 0x12, 0x0f, 0x2e, 0x70,
0x62, 0x2e, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x10, 0x2e,
0x70, 0x62, 0x2e, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22,
0x00, 0x12, 0x3c, 0x0a, 0x13, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x53, 0x74, 0x72, 0x65, 0x61,
0x6d, 0x69, 0x6e, 0x67, 0x54, 0x61, 0x73, 0x6b, 0x12, 0x0f, 0x2e, 0x70, 0x62, 0x2e, 0x54, 0x61,
0x73, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x10, 0x2e, 0x70, 0x62, 0x2e, 0x54,
0x61, 0x73, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12,
0x3c, 0x0a, 0x13, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69,
0x6e, 0x67, 0x54, 0x61, 0x73, 0x6b, 0x12, 0x0f, 0x2e, 0x70, 0x62, 0x2e, 0x54, 0x61, 0x73, 0x6b,
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x10, 0x2e, 0x70, 0x62, 0x2e, 0x54, 0x61, 0x73,
0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x12, 0x45, 0x0a,
0x1a, 0x42, 0x69, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x6c, 0x53, 0x74,
0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x54, 0x61, 0x73, 0x6b, 0x12, 0x0f, 0x2e, 0x70, 0x62,
0x2e, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x10, 0x2e, 0x70,
0x62, 0x2e, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00,
0x28, 0x01, 0x30, 0x01, 0x42, 0x07, 0x5a, 0x05, 0x2e, 0x2e, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_pb_task_proto_rawDescOnce sync.Once
file_pb_task_proto_rawDescData = file_pb_task_proto_rawDesc
)
func file_pb_task_proto_rawDescGZIP() []byte {
file_pb_task_proto_rawDescOnce.Do(func() {
file_pb_task_proto_rawDescData = protoimpl.X.CompressGZIP(file_pb_task_proto_rawDescData)
})
return file_pb_task_proto_rawDescData
}
var file_pb_task_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_pb_task_proto_goTypes = []interface{}{
(*TaskRequest)(nil), // 0: pb.TaskRequest
(*TaskResponse)(nil), // 1: pb.TaskResponse
}
var file_pb_task_proto_depIdxs = []int32{
0, // 0: pb.Task.UnaryTask:input_type -> pb.TaskRequest
0, // 1: pb.Task.ServerStreamingTask:input_type -> pb.TaskRequest
0, // 2: pb.Task.ClientStreamingTask:input_type -> pb.TaskRequest
0, // 3: pb.Task.BidirectionalStreamingTask:input_type -> pb.TaskRequest
1, // 4: pb.Task.UnaryTask:output_type -> pb.TaskResponse
1, // 5: pb.Task.ServerStreamingTask:output_type -> pb.TaskResponse
1, // 6: pb.Task.ClientStreamingTask:output_type -> pb.TaskResponse
1, // 7: pb.Task.BidirectionalStreamingTask:output_type -> pb.TaskResponse
4, // [4:8] is the sub-list for method output_type
0, // [0:4] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_pb_task_proto_init() }
func file_pb_task_proto_init() {
if File_pb_task_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_pb_task_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*TaskRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_pb_task_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*TaskResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_pb_task_proto_rawDesc,
NumEnums: 0,
NumMessages: 2,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_pb_task_proto_goTypes,
DependencyIndexes: file_pb_task_proto_depIdxs,
MessageInfos: file_pb_task_proto_msgTypes,
}.Build()
File_pb_task_proto = out.File
file_pb_task_proto_rawDesc = nil
file_pb_task_proto_goTypes = nil
file_pb_task_proto_depIdxs = nil
}

@ -0,0 +1,30 @@
//
syntax = "proto3";
//
package pb;
//
option go_package = "../pb";
//
service Task{
//
rpc UnaryTask(TaskRequest) returns (TaskResponse){};
//
rpc ServerStreamingTask(TaskRequest) returns (stream TaskResponse){};
//
rpc ClientStreamingTask(stream TaskRequest) returns (TaskResponse){};
//
rpc BidirectionalStreamingTask(stream TaskRequest) returns (stream TaskResponse){};
}
//
message TaskRequest {
string message = 1;
}
//
message TaskResponse {
string message = 1;
}

@ -0,0 +1,315 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.19.4
// source: pb/task.proto
package pb
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// TaskClient is the client API for Task service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type TaskClient interface {
// 普通一元方法
UnaryTask(ctx context.Context, in *TaskRequest, opts ...grpc.CallOption) (*TaskResponse, error)
// 服务端推送流
ServerStreamingTask(ctx context.Context, in *TaskRequest, opts ...grpc.CallOption) (Task_ServerStreamingTaskClient, error)
// 客户端推送流
ClientStreamingTask(ctx context.Context, opts ...grpc.CallOption) (Task_ClientStreamingTaskClient, error)
// 双向推送流
BidirectionalStreamingTask(ctx context.Context, opts ...grpc.CallOption) (Task_BidirectionalStreamingTaskClient, error)
}
type taskClient struct {
cc grpc.ClientConnInterface
}
func NewTaskClient(cc grpc.ClientConnInterface) TaskClient {
return &taskClient{cc}
}
func (c *taskClient) UnaryTask(ctx context.Context, in *TaskRequest, opts ...grpc.CallOption) (*TaskResponse, error) {
out := new(TaskResponse)
err := c.cc.Invoke(ctx, "/pb.Task/UnaryTask", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *taskClient) ServerStreamingTask(ctx context.Context, in *TaskRequest, opts ...grpc.CallOption) (Task_ServerStreamingTaskClient, error) {
stream, err := c.cc.NewStream(ctx, &Task_ServiceDesc.Streams[0], "/pb.Task/ServerStreamingTask", opts...)
if err != nil {
return nil, err
}
x := &taskServerStreamingTaskClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type Task_ServerStreamingTaskClient interface {
Recv() (*TaskResponse, error)
grpc.ClientStream
}
type taskServerStreamingTaskClient struct {
grpc.ClientStream
}
func (x *taskServerStreamingTaskClient) Recv() (*TaskResponse, error) {
m := new(TaskResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *taskClient) ClientStreamingTask(ctx context.Context, opts ...grpc.CallOption) (Task_ClientStreamingTaskClient, error) {
stream, err := c.cc.NewStream(ctx, &Task_ServiceDesc.Streams[1], "/pb.Task/ClientStreamingTask", opts...)
if err != nil {
return nil, err
}
x := &taskClientStreamingTaskClient{stream}
return x, nil
}
type Task_ClientStreamingTaskClient interface {
Send(*TaskRequest) error
CloseAndRecv() (*TaskResponse, error)
grpc.ClientStream
}
type taskClientStreamingTaskClient struct {
grpc.ClientStream
}
func (x *taskClientStreamingTaskClient) Send(m *TaskRequest) error {
return x.ClientStream.SendMsg(m)
}
func (x *taskClientStreamingTaskClient) CloseAndRecv() (*TaskResponse, error) {
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
m := new(TaskResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *taskClient) BidirectionalStreamingTask(ctx context.Context, opts ...grpc.CallOption) (Task_BidirectionalStreamingTaskClient, error) {
stream, err := c.cc.NewStream(ctx, &Task_ServiceDesc.Streams[2], "/pb.Task/BidirectionalStreamingTask", opts...)
if err != nil {
return nil, err
}
x := &taskBidirectionalStreamingTaskClient{stream}
return x, nil
}
type Task_BidirectionalStreamingTaskClient interface {
Send(*TaskRequest) error
Recv() (*TaskResponse, error)
grpc.ClientStream
}
type taskBidirectionalStreamingTaskClient struct {
grpc.ClientStream
}
func (x *taskBidirectionalStreamingTaskClient) Send(m *TaskRequest) error {
return x.ClientStream.SendMsg(m)
}
func (x *taskBidirectionalStreamingTaskClient) Recv() (*TaskResponse, error) {
m := new(TaskResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// TaskServer is the server API for Task service.
// All implementations must embed UnimplementedTaskServer
// for forward compatibility
type TaskServer interface {
// 普通一元方法
UnaryTask(context.Context, *TaskRequest) (*TaskResponse, error)
// 服务端推送流
ServerStreamingTask(*TaskRequest, Task_ServerStreamingTaskServer) error
// 客户端推送流
ClientStreamingTask(Task_ClientStreamingTaskServer) error
// 双向推送流
BidirectionalStreamingTask(Task_BidirectionalStreamingTaskServer) error
mustEmbedUnimplementedTaskServer()
}
// UnimplementedTaskServer must be embedded to have forward compatible implementations.
type UnimplementedTaskServer struct {
}
func (UnimplementedTaskServer) UnaryTask(context.Context, *TaskRequest) (*TaskResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method UnaryTask not implemented")
}
func (UnimplementedTaskServer) ServerStreamingTask(*TaskRequest, Task_ServerStreamingTaskServer) error {
return status.Errorf(codes.Unimplemented, "method ServerStreamingTask not implemented")
}
func (UnimplementedTaskServer) ClientStreamingTask(Task_ClientStreamingTaskServer) error {
return status.Errorf(codes.Unimplemented, "method ClientStreamingTask not implemented")
}
func (UnimplementedTaskServer) BidirectionalStreamingTask(Task_BidirectionalStreamingTaskServer) error {
return status.Errorf(codes.Unimplemented, "method BidirectionalStreamingTask not implemented")
}
func (UnimplementedTaskServer) mustEmbedUnimplementedTaskServer() {}
// UnsafeTaskServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to TaskServer will
// result in compilation errors.
type UnsafeTaskServer interface {
mustEmbedUnimplementedTaskServer()
}
func RegisterTaskServer(s grpc.ServiceRegistrar, srv TaskServer) {
s.RegisterService(&Task_ServiceDesc, srv)
}
func _Task_UnaryTask_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(TaskRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(TaskServer).UnaryTask(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/pb.Task/UnaryTask",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(TaskServer).UnaryTask(ctx, req.(*TaskRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Task_ServerStreamingTask_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(TaskRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(TaskServer).ServerStreamingTask(m, &taskServerStreamingTaskServer{stream})
}
type Task_ServerStreamingTaskServer interface {
Send(*TaskResponse) error
grpc.ServerStream
}
type taskServerStreamingTaskServer struct {
grpc.ServerStream
}
func (x *taskServerStreamingTaskServer) Send(m *TaskResponse) error {
return x.ServerStream.SendMsg(m)
}
func _Task_ClientStreamingTask_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(TaskServer).ClientStreamingTask(&taskClientStreamingTaskServer{stream})
}
type Task_ClientStreamingTaskServer interface {
SendAndClose(*TaskResponse) error
Recv() (*TaskRequest, error)
grpc.ServerStream
}
type taskClientStreamingTaskServer struct {
grpc.ServerStream
}
func (x *taskClientStreamingTaskServer) SendAndClose(m *TaskResponse) error {
return x.ServerStream.SendMsg(m)
}
func (x *taskClientStreamingTaskServer) Recv() (*TaskRequest, error) {
m := new(TaskRequest)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func _Task_BidirectionalStreamingTask_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(TaskServer).BidirectionalStreamingTask(&taskBidirectionalStreamingTaskServer{stream})
}
type Task_BidirectionalStreamingTaskServer interface {
Send(*TaskResponse) error
Recv() (*TaskRequest, error)
grpc.ServerStream
}
type taskBidirectionalStreamingTaskServer struct {
grpc.ServerStream
}
func (x *taskBidirectionalStreamingTaskServer) Send(m *TaskResponse) error {
return x.ServerStream.SendMsg(m)
}
func (x *taskBidirectionalStreamingTaskServer) Recv() (*TaskRequest, error) {
m := new(TaskRequest)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// Task_ServiceDesc is the grpc.ServiceDesc for Task service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var Task_ServiceDesc = grpc.ServiceDesc{
ServiceName: "pb.Task",
HandlerType: (*TaskServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "UnaryTask",
Handler: _Task_UnaryTask_Handler,
},
},
Streams: []grpc.StreamDesc{
{
StreamName: "ServerStreamingTask",
Handler: _Task_ServerStreamingTask_Handler,
ServerStreams: true,
},
{
StreamName: "ClientStreamingTask",
Handler: _Task_ClientStreamingTask_Handler,
ClientStreams: true,
},
{
StreamName: "BidirectionalStreamingTask",
Handler: _Task_BidirectionalStreamingTask_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
Metadata: "pb/task.proto",
}

@ -0,0 +1,125 @@
package pubsub
import (
"sync"
"time"
)
// 等待组放在共享内存池中减少GC
var wgPool = sync.Pool{New: func() interface{} { return new(sync.WaitGroup) }}
// NewPublisher
// 第一个参数控制发布时最大阻塞时间
// 第二个参数是缓冲区大小控制每个订阅者的chan缓冲区大小
func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher {
return &Publisher{
buffer: buffer,
timeout: publishTimeout,
subscribers: make(map[subscriber]topicFunc),
}
}
type subscriber chan interface{}
type topicFunc func(v interface{}) bool
type Publisher struct {
m sync.RWMutex // 控制订阅者map并发读写安全
buffer int // 每个订阅者chan缓冲区大小
timeout time.Duration // 发布阻塞超时时间
subscribers map[subscriber]topicFunc
}
// Len 返回订阅者数量
func (p *Publisher) Len() int {
p.m.RLock()
i := len(p.subscribers)
p.m.RUnlock()
return i
}
// Subscribe 无Topic订阅
func (p *Publisher) Subscribe() chan interface{} {
return p.SubscribeTopic(nil)
}
// SubscribeTopic 通过Topic订阅
func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} {
ch := make(chan interface{}, p.buffer)
p.m.Lock()
p.subscribers[ch] = topic
p.m.Unlock()
return ch
}
// SubscribeTopicWithBuffer 通过自定义chan缓冲区大小定义新的订阅者
func (p *Publisher) SubscribeTopicWithBuffer(topic topicFunc, buffer int) chan interface{} {
ch := make(chan interface{}, buffer)
p.m.Lock()
p.subscribers[ch] = topic
p.m.Unlock()
return ch
}
// Evict 移除某个订阅者
func (p *Publisher) Evict(sub chan interface{}) {
p.m.Lock()
_, exists := p.subscribers[sub]
if exists {
delete(p.subscribers, sub)
close(sub)
}
p.m.Unlock()
}
// Publish 发布消息
func (p *Publisher) Publish(v interface{}) {
p.m.RLock()
if len(p.subscribers) == 0 {
p.m.RUnlock()
return
}
wg := wgPool.Get().(*sync.WaitGroup)
for sub, topic := range p.subscribers {
wg.Add(1)
go p.sendTopic(sub, topic, v, wg)
}
wg.Wait()
wgPool.Put(wg)
p.m.RUnlock()
}
// Close 关闭服务
func (p *Publisher) Close() {
p.m.Lock()
for sub := range p.subscribers {
delete(p.subscribers, sub)
close(sub)
}
p.m.Unlock()
}
// 真正发布消息的逻辑通过Timer根据传入的timeout控制每次发布消息最大阻塞时长
func (p *Publisher) sendTopic(sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup) {
defer wg.Done()
if topic != nil && !topic(v) {
return
}
// 如果接收器不可用,请在选择“不阻止”下发送
if p.timeout > 0 {
timeout := time.NewTimer(p.timeout)
defer timeout.Stop()
select {
case sub <- v:
case <-timeout.C:
}
return
}
select {
case sub <- v:
default:
}
}

@ -0,0 +1,68 @@
package goip
import (
"errors"
"go.dtapp.net/gojobs/pb"
"go.dtapp.net/gojobs/pubsub"
"google.golang.org/grpc"
"log"
"net"
"time"
)
// ServerConfig 服务配置
type ServerConfig struct {
PublishTimeout time.Duration // 控制发布时最大阻塞时间
PubBuffer int // 缓冲区大小控制每个订阅者的chan缓冲区大小
Address string // 服务端口 0.0.0.0:8888
}
// Server 服务
type Server struct {
ServerConfig // 配置
Pub *pubsub.Publisher // 订阅
Conn *grpc.Server // 链接信息
}
// NewServer 创建服务和注册
func NewServer(config *ServerConfig) *Server {
if config.Address == "" {
panic("请填写服务端口")
}
s := &Server{}
s.PublishTimeout = config.PublishTimeout
s.PubBuffer = config.PubBuffer
s.Address = config.Address
s.Pub = pubsub.NewPublisher(config.PublishTimeout, config.PubBuffer)
// 创建gRPC服务器
s.Conn = grpc.NewServer()
// 注册
pb.RegisterPubSubServer(s.Conn, pb.NewPubSubServerService())
return s
}
// StartUp 启动
func (s *Server) StartUp() error {
// 监听本地端口
lis, err := net.Listen("tcp", s.Address)
if err != nil {
return errors.New("创建监听失败:" + err.Error())
}
log.Printf("正在监听的地址:%v", lis.Addr())
// 启动grpc
err = s.Conn.Serve(lis)
if err != nil {
return errors.New("创建服务失败:" + err.Error())
}
return nil
}

@ -0,0 +1,3 @@
package goip
const Version = "1.0.0"

@ -0,0 +1,7 @@
package goip
import "testing"
func TestVersion(t *testing.T) {
t.Log(Version)
}

@ -0,0 +1,43 @@
package goip
import (
"go.dtapp.net/gojobs/pb"
"google.golang.org/grpc"
)
// WorkerConfig 工作配置
type WorkerConfig struct {
Address string // 服务端口 127.0.0.1:8888
}
// Worker 工作
type Worker struct {
WorkerConfig // 配置
Pub pb.PubSubClient // 订阅
Conn *grpc.ClientConn // 链接信息
}
// NewWorker 创建工作
func NewWorker(config *WorkerConfig) *Worker {
if config.Address == "" {
panic("请填写服务端口")
}
w := &Worker{}
w.Address = config.Address
var err error
// 建立连接 获取client
w.Conn, err = grpc.Dial(w.Address, grpc.WithInsecure())
if err != nil {
panic("连接失败: " + err.Error())
}
// 新建一个客户端
w.Pub = pb.NewPubSubClient(w.Conn)
return w
}
Loading…
Cancel
Save