- update wechatopen

master
李光春 1 year ago
parent c238ba648a
commit 470f5dba37

@ -10,7 +10,7 @@ require (
github.com/basgys/goxml2json v1.1.0
github.com/bytedance/sonic v1.8.8
github.com/gin-gonic/gin v1.9.0
github.com/go-co-op/gocron v1.25.0
github.com/go-co-op/gocron v1.26.0
github.com/go-playground/locales v0.14.1
github.com/go-playground/universal-translator v0.18.1
github.com/go-playground/validator/v10 v10.13.0
@ -32,7 +32,7 @@ require (
github.com/tencentyun/cos-go-sdk-v5 v0.7.41
go.mongodb.org/mongo-driver v1.11.6
go.uber.org/zap v1.24.0
golang.org/x/crypto v0.8.0
golang.org/x/crypto v0.9.0
golang.org/x/text v0.9.0
gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df
gorm.io/datatypes v1.2.0
@ -98,11 +98,11 @@ require (
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/arch v0.3.0 // indirect
golang.org/x/mod v0.10.0 // indirect
golang.org/x/net v0.9.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sync v0.2.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.8.0 // indirect
golang.org/x/tools v0.9.1 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect

@ -106,8 +106,8 @@ github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE
github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
github.com/gin-gonic/gin v1.9.0 h1:OjyFBKICoexlu99ctXNR2gg+c5pKrKMuyjgARg9qeY8=
github.com/gin-gonic/gin v1.9.0/go.mod h1:W1Me9+hsUSyj3CePGrd1/QrKJMSJ1Tu/0hFEH89961k=
github.com/go-co-op/gocron v1.25.0 h1:pzAdtily1JVIf6lGby6K0JKzhishgLOllQgNxoYbR+8=
github.com/go-co-op/gocron v1.25.0/go.mod h1:JHrQDY4iE1HZPkgTyoccY4xtDgLbrUwL+xODIbEQdnc=
github.com/go-co-op/gocron v1.26.0 h1:dbX2xdy8tRE2o02PYhtYmK8WCBL8j7tVn/qgETBLL1g=
github.com/go-co-op/gocron v1.26.0/go.mod h1:JHrQDY4iE1HZPkgTyoccY4xtDgLbrUwL+xODIbEQdnc=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/kit v0.10.0/go.mod h1:xUsJbQ/Fp4kEt7AFgCuvyX4a71u8h9jB8tj/ORgOZ7o=
@ -613,8 +613,8 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw=
golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
golang.org/x/crypto v0.8.0 h1:pd9TJtTueMTVQXzk8E2XESSMQDj/U7OUu0PqJqPXQjQ=
golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE=
golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g=
golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
@ -651,8 +651,8 @@ golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qx
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM=
golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns=
golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@ -717,7 +717,7 @@ golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9sn
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.7.0 h1:BEvjmm5fURWqcfbSKTdpkDXYBrUS1c0m8agp14W48vQ=
golang.org/x/term v0.8.0 h1:n5xxQn2i3PC0yLAbjTpNT85q/Kgzcr2gIoX9OrJUols=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
@ -753,8 +753,8 @@ golang.org/x/tools v0.0.0-20200103221440-774c71fcf114/go.mod h1:TB2adYChydJhpapK
golang.org/x/tools v0.0.0-20201124115921-2c860bdd6e78/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.7/go.mod h1:LGqMHiF4EqQNHR1JncWGqT5BVaXmza+X+BDGol+dOxo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.8.0 h1:vSDcovVPld282ceKgDimkRSC8kpaH1dgyc9UMzlt84Y=
golang.org/x/tools v0.8.0/go.mod h1:JxBZ99ISMI5ViVkT1tr6tdNmXeTrcpVSD3vZ1RsRdN4=
golang.org/x/tools v0.9.1 h1:8WMNJAz3zrtPmnYC7ISf5dEn3MT0gY7jBJfw27yrrLo=
golang.org/x/tools v0.9.1/go.mod h1:owI94Op576fPu3cIGQeHs3joujW/2Oc6MtlxbF5dfNc=
golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

@ -1,5 +1,5 @@
package go_library
func Version() string {
return "1.0.139"
return "1.0.140"
}

@ -25,10 +25,6 @@ func newWxaDeleteTemplateResult(result WxaDeleteTemplateResponse, body []byte, h
// WxaDeleteTemplate 删除指定代码模板
// https://developers.weixin.qq.com/doc/oplatform/Third-party_Platforms/2.0/api/ThirdParty/code_template/deletetemplate.html
func (c *Client) WxaDeleteTemplate(ctx context.Context, templateId string, notMustParams ...gorequest.Params) (*WxaDeleteTemplateResult, error) {
// 检查
if err := c.checkAuthorizerConfig(ctx); err != nil {
return newWxaDeleteTemplateResult(WxaDeleteTemplateResponse{}, []byte{}, gorequest.Response{}), err
}
// 参数
params := gorequest.NewParamsWith(notMustParams...)
params.Set("template_id", templateId)

@ -61,6 +61,9 @@ s.Every(2).MonthLastDay().Do(func(){ ... })
// cron expressions supported
s.Cron("*/1 * * * *").Do(task) // every minute
// cron second-level expressions supported
s.CronWithSeconds("*/1 * * * * *").Do(task) // every second
// you can start running the scheduler in two different ways:
// starts the scheduler asynchronously
s.StartAsync()
@ -85,16 +88,16 @@ For more examples, take a look in our [go docs](https://pkg.go.dev/github.com/go
There are several options available to restrict how jobs run:
| Mode | Function | Behavior |
|----------------------------|---------------------------|------------------------------------------------------------------------------------------------------|
| Default | | jobs are rescheduled at every interval |
| Job singleton | `SingletonMode()` | a long running job will not be rescheduled until the current run is completed |
| Scheduler limit | `SetMaxConcurrentJobs()` | set a collective maximum number of concurrent jobs running across the scheduler |
| Distributed locking (BETA) | `WithDistributedLocker()` | prevents the same job from being run more than once when running multiple instances of the scheduler |
| Mode | Function | Behavior |
|---------------------|---------------------------|------------------------------------------------------------------------------------------------------|
| Default | | jobs are rescheduled at every interval |
| Job singleton | `SingletonMode()` | a long running job will not be rescheduled until the current run is completed |
| Scheduler limit | `SetMaxConcurrentJobs()` | set a collective maximum number of concurrent jobs running across the scheduler |
| Distributed locking | `WithDistributedLocker()` | prevents the same job from being run more than once when running multiple instances of the scheduler |
## Distributed Locker Implementations
- Redis: [redis.go](lockers/redislock/redislock.go) `go get github.com/go-co-op/gocron/lockers/redislock`
- Redis: [redislock](lockers/redislock/README.md) `go get github.com/go-co-op/gocron/lockers/redislock`
## Tags
@ -122,8 +125,8 @@ s.RunByTag("tag")
- Q: I'm running multiple pods on a distributed environment. How can I make a job not run once per pod causing duplication?
- We recommend using your own lock solution within the jobs themselves (you could use [Redis](https://redis.io/topics/distlock), for example)
- A2: Currently in BETA (please provide feedback): Use the scheduler option `WithDistributedLocker` and either use an implemented backend
or implement your own and contribute it back in a PR (we hope)!
- A2: Use the scheduler option `WithDistributedLocker` and either use an implemented [backend](lockers)
or implement your own and contribute it back in a PR!
- Q: I've removed my job from the scheduler, but how can I stop a long-running job that has already been triggered?
- A: We recommend using a means of canceling your job, e.g. a `context.WithCancel()`.

@ -1,5 +0,0 @@
version: "3.8"
services:
redis:
image: redis:6.2-alpine

@ -48,6 +48,7 @@ type executor struct {
limitModeFuncsRunning *atomic.Int64 // tracks the count of limited mode funcs running
limitModeFuncWg *sync.WaitGroup // allow the executor to wait for limit mode functions to wrap up
limitModeQueue chan jobFunction // pass job functions to the limit mode workers
limitModeQueueMu *sync.Mutex // for protecting the limitModeQueue
limitModeRunningJobs *atomic.Int64 // tracks the count of running jobs to check against the max
stopped *atomic.Bool // allow workers to drain the buffered limitModeQueue
@ -61,7 +62,7 @@ func newExecutor() executor {
limitModeFuncsRunning: &atomic.Int64{},
limitModeFuncWg: &sync.WaitGroup{},
limitModeRunningJobs: &atomic.Int64{},
limitModeQueue: make(chan jobFunction, 1000),
limitModeQueueMu: &sync.Mutex{},
}
return e
}
@ -96,7 +97,6 @@ func (jf *jobFunction) singletonRunner() {
}
func (e *executor) limitModeRunner() {
e.limitModeFuncWg.Add(1)
for {
select {
case <-e.ctx.Done():
@ -105,7 +105,7 @@ func (e *executor) limitModeRunner() {
return
case jf := <-e.limitModeQueue:
if !e.stopped.Load() {
runJob(jf)
e.runJob(jf)
}
}
}
@ -122,9 +122,48 @@ func (e *executor) start() {
e.jobsWg = &sync.WaitGroup{}
e.stopped = &atomic.Bool{}
e.limitModeQueueMu.Lock()
e.limitModeQueue = make(chan jobFunction, 1000)
e.limitModeQueueMu.Unlock()
go e.run()
}
func (e *executor) runJob(f jobFunction) {
switch f.runConfig.mode {
case defaultMode:
lockKey := f.jobName
if lockKey == "" {
lockKey = f.funcName
}
if e.distributedLocker != nil {
l, err := e.distributedLocker.Lock(f.ctx, lockKey)
if err != nil || l == nil {
return
}
defer func() {
durationToNextRun := time.Until(f.jobFuncNextRun)
if durationToNextRun > time.Second*5 {
durationToNextRun = time.Second * 5
}
if durationToNextRun > time.Millisecond*100 {
timeToSleep := time.Duration(float64(durationToNextRun) * 0.9)
time.Sleep(timeToSleep)
}
_ = l.Unlock(f.ctx)
}()
}
runJob(f)
case singletonMode:
e.singletonWgs.Store(f.singletonWg, struct{}{})
if !f.singletonRunnerOn.Load() {
go f.singletonRunner()
}
f.singletonQueue <- struct{}{}
}
}
func (e *executor) run() {
for {
select {
@ -138,6 +177,7 @@ func (e *executor) run() {
if countRunning < int64(e.limitModeMaxRunningJobs) {
diff := int64(e.limitModeMaxRunningJobs) - countRunning
for i := int64(0); i < diff; i++ {
e.limitModeFuncWg.Add(1)
go e.limitModeRunner()
e.limitModeFuncsRunning.Add(1)
}
@ -154,7 +194,7 @@ func (e *executor) run() {
if panicHandler != nil {
defer func() {
if r := recover(); r != any(nil) {
panicHandler(f.name, r)
panicHandler(f.funcName, r)
}
}()
}
@ -163,42 +203,21 @@ func (e *executor) run() {
switch e.limitMode {
case RescheduleMode:
if e.limitModeRunningJobs.Load() < int64(e.limitModeMaxRunningJobs) {
e.limitModeQueue <- f
select {
case e.limitModeQueue <- f:
case <-e.ctx.Done():
}
}
case WaitMode:
e.limitModeQueue <- f
select {
case e.limitModeQueue <- f:
case <-e.ctx.Done():
}
}
return
}
switch f.runConfig.mode {
case defaultMode:
if e.distributedLocker != nil {
l, err := e.distributedLocker.Lock(f.ctx, f.name)
if err != nil || l == nil {
return
}
defer func() {
durationToNextRun := time.Until(f.jobFuncNextRun)
if durationToNextRun > time.Second*5 {
durationToNextRun = time.Second * 5
}
if durationToNextRun > time.Millisecond*100 {
timeToSleep := time.Duration(float64(durationToNextRun) * 0.9)
time.Sleep(timeToSleep)
}
_ = l.Unlock(f.ctx)
}()
}
runJob(f)
case singletonMode:
e.singletonWgs.Store(f.singletonWg, struct{}{})
if !f.singletonRunnerOn.Load() {
go f.singletonRunner()
}
f.singletonQueue <- struct{}{}
}
e.runJob(f)
}()
case <-e.ctx.Done():
e.jobsWg.Wait()
@ -222,5 +241,8 @@ func (e *executor) stop() {
}
if e.limitModeMaxRunningJobs > 0 {
e.limitModeFuncWg.Wait()
e.limitModeQueueMu.Lock()
e.limitModeQueue = nil
e.limitModeQueueMu.Unlock()
}
}

@ -28,7 +28,7 @@ var (
// SetPanicHandler sets the global panicHandler to the given function.
// Leaving it nil or setting it to nil disables automatic panic handling.
// If the panicHandler is not nil, any panic that occurs during executing a job will be recovered
// and the panicHandlerFunc will be called with the job's name and the recover data.
// and the panicHandlerFunc will be called with the job's funcName and the recover data.
func SetPanicHandler(handler PanicHandlerFunc) {
panicHandlerMutex.Lock()
defer panicHandlerMutex.Unlock()

@ -45,7 +45,8 @@ type jobFunction struct {
function any // task's function
parameters []any // task's function parameters
parametersLen int // length of the passed parameters
name string // nolint the function name to run
jobName string // key of the distributed lock
funcName string // the name of the function - e.g. main.func1
runConfig runConfig // configuration for how many times to run the job
singletonQueue chan struct{} // queues jobs for the singleton runner to handle
singletonRunnerOn *atomic.Bool // whether the runner function for singleton is running
@ -74,7 +75,8 @@ func (jf *jobFunction) copy() jobFunction {
function: jf.function,
parameters: nil,
parametersLen: jf.parametersLen,
name: jf.name,
funcName: jf.funcName,
jobName: jf.jobName,
runConfig: jf.runConfig,
singletonQueue: jf.singletonQueue,
ctx: jf.ctx,
@ -135,6 +137,16 @@ func newJob(interval int, startImmediately bool, singletonMode bool) *Job {
return job
}
// Name sets the name of the current job.
//
// If the scheduler is running using WithDistributedLocker(),
// the job name is used as the distributed lock key.
func (j *Job) Name(name string) {
j.mu.Lock()
defer j.mu.Unlock()
j.jobName = name
}
func (j *Job) setRandomInterval(a, b int) {
j.random.rand = rand.New(rand.NewSource(time.Now().UnixNano())) // nolint

@ -7,6 +7,7 @@ import (
"sort"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/robfig/cron/v3"
@ -22,8 +23,7 @@ type Scheduler struct {
locationMutex sync.RWMutex
location *time.Location
runningMutex sync.RWMutex
running bool // represents if the scheduler is running at the moment or not
running *atomic.Bool // represents if the scheduler is running at the moment or not
time TimeWrapper // wrapper around time.Time
timer func(d time.Duration, f func()) *time.Timer
@ -58,7 +58,7 @@ func NewScheduler(loc *time.Location) *Scheduler {
return &Scheduler{
jobs: make([]*Job, 0),
location: loc,
running: false,
running: &atomic.Bool{},
time: &trueTime{},
executor: &executor,
tagsUnique: false,
@ -82,7 +82,12 @@ func (s *Scheduler) StartBlocking() {
s.startBlockingStopChanMutex.Lock()
s.startBlockingStopChan = make(chan struct{}, 1)
s.startBlockingStopChanMutex.Unlock()
<-s.startBlockingStopChan
s.startBlockingStopChanMutex.Lock()
s.startBlockingStopChan = nil
s.startBlockingStopChanMutex.Unlock()
}
// StartAsync starts all jobs without blocking the current thread
@ -111,16 +116,12 @@ func (s *Scheduler) runJobs(jobs []*Job) {
}
func (s *Scheduler) setRunning(b bool) {
s.runningMutex.Lock()
defer s.runningMutex.Unlock()
s.running = b
s.running.Store(b)
}
// IsRunning returns true if the scheduler is running
func (s *Scheduler) IsRunning() bool {
s.runningMutex.RLock()
defer s.runningMutex.RUnlock()
return s.running
return s.running.Load()
}
// Jobs returns the list of Jobs from the Scheduler
@ -130,6 +131,16 @@ func (s *Scheduler) Jobs() []*Job {
return s.jobs
}
// Name sets the name of the current job.
//
// If the scheduler is running using WithDistributedLocker(), the job name is used
// as the distributed lock key.
func (s *Scheduler) Name(name string) *Scheduler {
job := s.getCurrentJob()
job.jobName = name
return s
}
func (s *Scheduler) setJobs(jobs []*Job) {
s.jobsMutex.Lock()
defer s.jobsMutex.Unlock()
@ -662,7 +673,7 @@ func (s *Scheduler) Remove(job any) {
j := s.findJobByTaskName(fName)
s.removeJobsUniqueTags(j)
s.removeByCondition(func(someJob *Job) bool {
return someJob.name == fName
return someJob.funcName == fName
})
}
@ -678,7 +689,7 @@ func (s *Scheduler) RemoveByReference(job *Job) {
func (s *Scheduler) findJobByTaskName(name string) *Job {
for _, job := range s.Jobs() {
if job.name == name {
if job.funcName == name {
return job
}
}
@ -824,7 +835,7 @@ func (s *Scheduler) SingletonModeAll() {
// TaskPresent checks if specific job's function was added to the scheduler.
func (s *Scheduler) TaskPresent(j any) bool {
for _, job := range s.Jobs() {
if job.name == getFunctionName(j) {
if job.funcName == getFunctionName(j) {
return true
}
}
@ -872,10 +883,10 @@ func (s *Scheduler) Stop() {
}
func (s *Scheduler) stop() {
s.setRunning(false)
s.stopJobs(s.jobs)
s.executor.stop()
s.StopBlockingChan()
s.setRunning(false)
}
func (s *Scheduler) stopJobs(jobs []*Job) {
@ -919,10 +930,10 @@ func (s *Scheduler) doCommon(jobFun any, params ...any) (*Job, error) {
}
fname := getFunctionName(jobFun)
if job.name != fname {
if job.funcName != fname {
job.function = jobFun
job.parameters = params
job.name = fname
job.funcName = fname
}
f := reflect.ValueOf(jobFun)
@ -1380,8 +1391,8 @@ func (s *Scheduler) CustomTimer(customTimer func(d time.Duration, f func()) *tim
func (s *Scheduler) StopBlockingChan() {
s.startBlockingStopChanMutex.Lock()
if s.startBlockingStopChan != nil {
s.startBlockingStopChan <- struct{}{}
if s.IsRunning() && s.startBlockingStopChan != nil {
close(s.startBlockingStopChan)
}
s.startBlockingStopChanMutex.Unlock()
}
@ -1400,8 +1411,13 @@ func (s *Scheduler) StopBlockingChan() {
// Another strategy is to use the Cron or CronWithSeconds methods as they
// use the same behavior described above using StartAt.
//
// NOTE - the Locker will NOT lock jobs using any of the limiting functions:
// SingletonMode, SingletonModeAll or SetMaxConcurrentJobs
// NOTE - the Locker will NOT lock jobs using the singleton options:
// SingletonMode, or SingletonModeAll
//
// NOTE - beware of potential race conditions when running the Locker
// with SetMaxConcurrentJobs and WaitMode as jobs are not guaranteed
// to be locked when each scheduler's is below its limit and able
// to run the job.
func (s *Scheduler) WithDistributedLocker(l Locker) {
s.executor.distributedLocker = l
}

@ -99,14 +99,20 @@ Care should be taken when parsing and interpreting HTML, whether full documents
or fragments, within the framework of the HTML specification, especially with
regard to untrusted inputs.
This package provides both a tokenizer and a parser. Only the parser constructs
a DOM according to the HTML specification, resolving malformed and misplaced
tags where appropriate. The tokenizer simply tokenizes the HTML presented to it,
and as such does not resolve issues that may exist in the processed HTML,
producing a literal interpretation of the input.
If your use case requires semantically well-formed HTML, as defined by the
WHATWG specification, the parser should be used rather than the tokenizer.
This package provides both a tokenizer and a parser, which implement the
tokenization, and tokenization and tree construction stages of the WHATWG HTML
parsing specification respectively. While the tokenizer parses and normalizes
individual HTML tokens, only the parser constructs the DOM tree from the
tokenized HTML, as described in the tree construction stage of the
specification, dynamically modifying or extending the docuemnt's DOM tree.
If your use case requires semantically well-formed HTML documents, as defined by
the WHATWG specification, the parser should be used rather than the tokenizer.
In security contexts, if trust decisions are being made using the tokenized or
parsed content, the input must be re-serialized (for instance by using Render or
Token.String) in order for those trust decisions to hold, as the process of
tokenization or parsing may alter the content.
*/
package html // import "golang.org/x/net/html"

@ -1266,6 +1266,27 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
return res, nil
}
cancelRequest := func(cs *clientStream, err error) error {
cs.cc.mu.Lock()
defer cs.cc.mu.Unlock()
cs.abortStreamLocked(err)
if cs.ID != 0 {
// This request may have failed because of a problem with the connection,
// or for some unrelated reason. (For example, the user might have canceled
// the request without waiting for a response.) Mark the connection as
// not reusable, since trying to reuse a dead connection is worse than
// unnecessarily creating a new one.
//
// If cs.ID is 0, then the request was never allocated a stream ID and
// whatever went wrong was unrelated to the connection. We might have
// timed out waiting for a stream slot when StrictMaxConcurrentStreams
// is set, for example, in which case retrying on a different connection
// will not help.
cs.cc.doNotReuse = true
}
return err
}
for {
select {
case <-cs.respHeaderRecv:
@ -1280,15 +1301,12 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
return handleResponseHeaders()
default:
waitDone()
return nil, cs.abortErr
return nil, cancelRequest(cs, cs.abortErr)
}
case <-ctx.Done():
err := ctx.Err()
cs.abortStream(err)
return nil, err
return nil, cancelRequest(cs, ctx.Err())
case <-cs.reqCancel:
cs.abortStream(errRequestCanceled)
return nil, errRequestCanceled
return nil, cancelRequest(cs, errRequestCanceled)
}
}
}

@ -113,6 +113,20 @@ const (
opObj = 'O' // .Obj() (Named, TypeParam)
)
// For is equivalent to new(Encoder).For(obj).
//
// It may be more efficient to reuse a single Encoder across several calls.
func For(obj types.Object) (Path, error) {
return new(Encoder).For(obj)
}
// An Encoder amortizes the cost of encoding the paths of multiple objects.
// The zero value of an Encoder is ready to use.
type Encoder struct {
scopeNamesMemo map[*types.Scope][]string // memoization of Scope.Names()
namedMethodsMemo map[*types.Named][]*types.Func // memoization of namedMethods()
}
// For returns the path to an object relative to its package,
// or an error if the object is not accessible from the package's Scope.
//
@ -145,24 +159,7 @@ const (
// .Type().Field(0) (field Var X)
//
// where p is the package (*types.Package) to which X belongs.
func For(obj types.Object) (Path, error) {
return newEncoderFor()(obj)
}
// An encoder amortizes the cost of encoding the paths of multiple objects.
// Nonexported pending approval of proposal 58668.
type encoder struct {
scopeNamesMemo map[*types.Scope][]string // memoization of Scope.Names()
namedMethodsMemo map[*types.Named][]*types.Func // memoization of namedMethods()
}
// Exposed to gopls via golang.org/x/tools/internal/typesinternal
// pending approval of proposal 58668.
//
//go:linkname newEncoderFor
func newEncoderFor() func(types.Object) (Path, error) { return new(encoder).For }
func (enc *encoder) For(obj types.Object) (Path, error) {
func (enc *Encoder) For(obj types.Object) (Path, error) {
pkg := obj.Pkg()
// This table lists the cases of interest.
@ -341,7 +338,7 @@ func appendOpArg(path []byte, op byte, arg int) []byte {
// This function is just an optimization that avoids the general scope walking
// approach. You are expected to fall back to the general approach if this
// function fails.
func (enc *encoder) concreteMethod(meth *types.Func) (Path, bool) {
func (enc *Encoder) concreteMethod(meth *types.Func) (Path, bool) {
// Concrete methods can only be declared on package-scoped named types. For
// that reason we can skip the expensive walk over the package scope: the
// path will always be package -> named type -> method. We can trivially get
@ -421,7 +418,13 @@ func (enc *encoder) concreteMethod(meth *types.Func) (Path, bool) {
}
}
panic(fmt.Sprintf("couldn't find method %s on type %s", meth, named))
// Due to golang/go#59944, go/types fails to associate the receiver with
// certain methods on cgo types.
//
// TODO(rfindley): replace this panic once golang/go#59944 is fixed in all Go
// versions gopls supports.
return "", false
// panic(fmt.Sprintf("couldn't find method %s on type %s; methods: %#v", meth, named, enc.namedMethods(named)))
}
// find finds obj within type T, returning the path to it, or nil if not found.
@ -730,23 +733,8 @@ func namedMethods(named *types.Named) []*types.Func {
return methods
}
// scopeNames is a memoization of scope.Names. Callers must not modify the result.
func (enc *encoder) scopeNames(scope *types.Scope) []string {
m := enc.scopeNamesMemo
if m == nil {
m = make(map[*types.Scope][]string)
enc.scopeNamesMemo = m
}
names, ok := m[scope]
if !ok {
names = scope.Names() // allocates and sorts
m[scope] = names
}
return names
}
// namedMethods is a memoization of the namedMethods function. Callers must not modify the result.
func (enc *encoder) namedMethods(named *types.Named) []*types.Func {
func (enc *Encoder) namedMethods(named *types.Named) []*types.Func {
m := enc.namedMethodsMemo
if m == nil {
m = make(map[*types.Named][]*types.Func)
@ -758,5 +746,19 @@ func (enc *encoder) namedMethods(named *types.Named) []*types.Func {
m[named] = methods
}
return methods
}
// scopeNames is a memoization of scope.Names. Callers must not modify the result.
func (enc *Encoder) scopeNames(scope *types.Scope) []string {
m := enc.scopeNamesMemo
if m == nil {
m = make(map[*types.Scope][]string)
enc.scopeNamesMemo = m
}
names, ok := m[scope]
if !ok {
names = scope.Names() // allocates and sorts
m[scope] = names
}
return names
}

@ -8,10 +8,12 @@ package gocommand
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"log"
"os"
"reflect"
"regexp"
"runtime"
"strconv"
@ -215,6 +217,18 @@ func (i *Invocation) run(ctx context.Context, stdout, stderr io.Writer) error {
cmd := exec.Command("go", goArgs...)
cmd.Stdout = stdout
cmd.Stderr = stderr
// cmd.WaitDelay was added only in go1.20 (see #50436).
if waitDelay := reflect.ValueOf(cmd).Elem().FieldByName("WaitDelay"); waitDelay.IsValid() {
// https://go.dev/issue/59541: don't wait forever copying stderr
// after the command has exited.
// After CL 484741 we copy stdout manually, so we we'll stop reading that as
// soon as ctx is done. However, we also don't want to wait around forever
// for stderr. Give a much-longer-than-reasonable delay and then assume that
// something has wedged in the kernel or runtime.
waitDelay.Set(reflect.ValueOf(30 * time.Second))
}
// On darwin the cwd gets resolved to the real path, which breaks anything that
// expects the working directory to keep the original path, including the
// go command when dealing with modules.
@ -229,6 +243,7 @@ func (i *Invocation) run(ctx context.Context, stdout, stderr io.Writer) error {
cmd.Env = append(cmd.Env, "PWD="+i.WorkingDir)
cmd.Dir = i.WorkingDir
}
defer func(start time.Time) { log("%s for %v", time.Since(start), cmdDebugStr(cmd)) }(time.Now())
return runCmdContext(ctx, cmd)
@ -242,10 +257,85 @@ var DebugHangingGoCommands = false
// runCmdContext is like exec.CommandContext except it sends os.Interrupt
// before os.Kill.
func runCmdContext(ctx context.Context, cmd *exec.Cmd) error {
if err := cmd.Start(); err != nil {
func runCmdContext(ctx context.Context, cmd *exec.Cmd) (err error) {
// If cmd.Stdout is not an *os.File, the exec package will create a pipe and
// copy it to the Writer in a goroutine until the process has finished and
// either the pipe reaches EOF or command's WaitDelay expires.
//
// However, the output from 'go list' can be quite large, and we don't want to
// keep reading (and allocating buffers) if we've already decided we don't
// care about the output. We don't want to wait for the process to finish, and
// we don't wait to wait for the WaitDelay to expire either.
//
// Instead, if cmd.Stdout requires a copying goroutine we explicitly replace
// it with a pipe (which is an *os.File), which we can close in order to stop
// copying output as soon as we realize we don't care about it.
var stdoutW *os.File
if cmd.Stdout != nil {
if _, ok := cmd.Stdout.(*os.File); !ok {
var stdoutR *os.File
stdoutR, stdoutW, err = os.Pipe()
if err != nil {
return err
}
prevStdout := cmd.Stdout
cmd.Stdout = stdoutW
stdoutErr := make(chan error, 1)
go func() {
_, err := io.Copy(prevStdout, stdoutR)
if err != nil {
err = fmt.Errorf("copying stdout: %w", err)
}
stdoutErr <- err
}()
defer func() {
// We started a goroutine to copy a stdout pipe.
// Wait for it to finish, or terminate it if need be.
var err2 error
select {
case err2 = <-stdoutErr:
stdoutR.Close()
case <-ctx.Done():
stdoutR.Close()
// Per https://pkg.go.dev/os#File.Close, the call to stdoutR.Close
// should cause the Read call in io.Copy to unblock and return
// immediately, but we still need to receive from stdoutErr to confirm
// that that has happened.
<-stdoutErr
err2 = ctx.Err()
}
if err == nil {
err = err2
}
}()
// Per https://pkg.go.dev/os/exec#Cmd, “If Stdout and Stderr are the
// same writer, and have a type that can be compared with ==, at most
// one goroutine at a time will call Write.”
//
// Since we're starting a goroutine that writes to cmd.Stdout, we must
// also update cmd.Stderr so that that still holds.
func() {
defer func() { recover() }()
if cmd.Stderr == prevStdout {
cmd.Stderr = cmd.Stdout
}
}()
}
}
err = cmd.Start()
if stdoutW != nil {
// The child process has inherited the pipe file,
// so close the copy held in this process.
stdoutW.Close()
stdoutW = nil
}
if err != nil {
return err
}
resChan := make(chan error, 1)
go func() {
resChan <- cmd.Wait()
@ -253,11 +343,14 @@ func runCmdContext(ctx context.Context, cmd *exec.Cmd) error {
// If we're interested in debugging hanging Go commands, stop waiting after a
// minute and panic with interesting information.
if DebugHangingGoCommands {
debug := DebugHangingGoCommands
if debug {
timer := time.NewTimer(1 * time.Minute)
defer timer.Stop()
select {
case err := <-resChan:
return err
case <-time.After(1 * time.Minute):
case <-timer.C:
HandleHangingGoCommand(cmd.Process)
case <-ctx.Done():
}
@ -270,30 +363,25 @@ func runCmdContext(ctx context.Context, cmd *exec.Cmd) error {
}
// Cancelled. Interrupt and see if it ends voluntarily.
cmd.Process.Signal(os.Interrupt)
select {
case err := <-resChan:
return err
case <-time.After(time.Second):
if err := cmd.Process.Signal(os.Interrupt); err == nil {
// (We used to wait only 1s but this proved
// fragile on loaded builder machines.)
timer := time.NewTimer(5 * time.Second)
defer timer.Stop()
select {
case err := <-resChan:
return err
case <-timer.C:
}
}
// Didn't shut down in response to interrupt. Kill it hard.
// TODO(rfindley): per advice from bcmills@, it may be better to send SIGQUIT
// on certain platforms, such as unix.
if err := cmd.Process.Kill(); err != nil && DebugHangingGoCommands {
// Don't panic here as this reliably fails on windows with EINVAL.
if err := cmd.Process.Kill(); err != nil && !errors.Is(err, os.ErrProcessDone) && debug {
log.Printf("error killing the Go command: %v", err)
}
// See above: don't wait indefinitely if we're debugging hanging Go commands.
if DebugHangingGoCommands {
select {
case err := <-resChan:
return err
case <-time.After(10 * time.Second): // a shorter wait as resChan should return quickly following Kill
HandleHangingGoCommand(cmd.Process)
}
}
return <-resChan
}

@ -23,21 +23,11 @@ import (
func GoVersion(ctx context.Context, inv Invocation, r *Runner) (int, error) {
inv.Verb = "list"
inv.Args = []string{"-e", "-f", `{{context.ReleaseTags}}`, `--`, `unsafe`}
inv.Env = append(append([]string{}, inv.Env...), "GO111MODULE=off")
// Unset any unneeded flags, and remove them from BuildFlags, if they're
// present.
inv.ModFile = ""
inv.BuildFlags = nil // This is not a build command.
inv.ModFlag = ""
var buildFlags []string
for _, flag := range inv.BuildFlags {
// Flags can be prefixed by one or two dashes.
f := strings.TrimPrefix(strings.TrimPrefix(flag, "-"), "-")
if strings.HasPrefix(f, "mod=") || strings.HasPrefix(f, "modfile=") {
continue
}
buildFlags = append(buildFlags, flag)
}
inv.BuildFlags = buildFlags
inv.ModFile = ""
inv.Env = append(inv.Env[:len(inv.Env):len(inv.Env)], "GO111MODULE=off")
stdoutBytes, err := r.Run(ctx, inv)
if err != nil {
return 0, err

@ -414,9 +414,16 @@ func (p *pass) fix() ([]*ImportFix, bool) {
})
}
}
// Collecting fixes involved map iteration, so sort for stability. See
// golang/go#59976.
sortFixes(fixes)
// collect selected fixes in a separate slice, so that it can be sorted
// separately. Note that these fixes must occur after fixes to existing
// imports. TODO(rfindley): figure out why.
var selectedFixes []*ImportFix
for _, imp := range selected {
fixes = append(fixes, &ImportFix{
selectedFixes = append(selectedFixes, &ImportFix{
StmtInfo: ImportInfo{
Name: p.importSpecName(imp),
ImportPath: imp.ImportPath,
@ -425,8 +432,25 @@ func (p *pass) fix() ([]*ImportFix, bool) {
FixType: AddImport,
})
}
sortFixes(selectedFixes)
return append(fixes, selectedFixes...), true
}
return fixes, true
func sortFixes(fixes []*ImportFix) {
sort.Slice(fixes, func(i, j int) bool {
fi, fj := fixes[i], fixes[j]
if fi.StmtInfo.ImportPath != fj.StmtInfo.ImportPath {
return fi.StmtInfo.ImportPath < fj.StmtInfo.ImportPath
}
if fi.StmtInfo.Name != fj.StmtInfo.Name {
return fi.StmtInfo.Name < fj.StmtInfo.Name
}
if fi.IdentName != fj.IdentName {
return fi.IdentName < fj.IdentName
}
return fi.FixType < fj.FixType
})
}
// importSpecName gets the import name of imp in the import spec.

@ -75,7 +75,7 @@ github.com/gin-gonic/gin/binding
github.com/gin-gonic/gin/internal/bytesconv
github.com/gin-gonic/gin/internal/json
github.com/gin-gonic/gin/render
# github.com/go-co-op/gocron v1.25.0
# github.com/go-co-op/gocron v1.26.0
## explicit; go 1.20
github.com/go-co-op/gocron
# github.com/go-logr/logr v1.2.4
@ -460,7 +460,7 @@ go.uber.org/zap/zapcore
# golang.org/x/arch v0.3.0
## explicit; go 1.17
golang.org/x/arch/x86/x86asm
# golang.org/x/crypto v0.8.0
# golang.org/x/crypto v0.9.0
## explicit; go 1.17
golang.org/x/crypto/blowfish
golang.org/x/crypto/chacha20
@ -480,7 +480,7 @@ golang.org/x/crypto/ssh/internal/bcrypt_pbkdf
golang.org/x/mod/internal/lazyregexp
golang.org/x/mod/module
golang.org/x/mod/semver
# golang.org/x/net v0.9.0
# golang.org/x/net v0.10.0
## explicit; go 1.17
golang.org/x/net/html
golang.org/x/net/html/atom
@ -531,7 +531,7 @@ golang.org/x/text/width
# golang.org/x/time v0.3.0
## explicit
golang.org/x/time/rate
# golang.org/x/tools v0.8.0
# golang.org/x/tools v0.9.1
## explicit; go 1.18
golang.org/x/tools/go/ast/astutil
golang.org/x/tools/go/gcexportdata

Loading…
Cancel
Save