master
李光春 12 months ago
parent ae848ac95d
commit f762653c62

@ -10,7 +10,6 @@ require (
github.com/basgys/goxml2json v1.1.0
github.com/bytedance/sonic v1.8.8
github.com/gin-gonic/gin v1.9.0
github.com/go-co-op/gocron v1.27.0
github.com/go-playground/locales v0.14.1
github.com/go-playground/universal-translator v0.18.1
github.com/go-playground/validator/v10 v10.13.0
@ -78,6 +77,7 @@ require (
github.com/oschwald/maxminddb-golang v1.10.0 // indirect
github.com/pelletier/go-toml/v2 v2.0.7 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/rogpeppe/go-internal v1.8.1 // indirect
github.com/saracen/go7z-fixtures v0.0.0-20190623165746-aa6b8fba1d2f // indirect
github.com/saracen/solidblock v0.0.0-20190426153529-45df20abab6f // indirect
github.com/syndtr/goleveldb v1.0.0 // indirect

@ -106,8 +106,6 @@ github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE
github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
github.com/gin-gonic/gin v1.9.0 h1:OjyFBKICoexlu99ctXNR2gg+c5pKrKMuyjgARg9qeY8=
github.com/gin-gonic/gin v1.9.0/go.mod h1:W1Me9+hsUSyj3CePGrd1/QrKJMSJ1Tu/0hFEH89961k=
github.com/go-co-op/gocron v1.27.0 h1:GbP9A0wauIeGbMCUzdGb2IAi1JHzNHT/H/lLW2ODwLE=
github.com/go-co-op/gocron v1.27.0/go.mod h1:39f6KNSGVOU1LO/ZOoZfcSxwlsJDQOKSu8erN0SH48Y=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/kit v0.10.0/go.mod h1:xUsJbQ/Fp4kEt7AFgCuvyX4a71u8h9jB8tj/ORgOZ7o=
@ -576,7 +574,6 @@ go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=

@ -1,5 +1,5 @@
package go_library
func Version() string {
return "1.0.145"
return "1.0.146"
}

@ -18,6 +18,10 @@ type Client struct {
status bool // 状态
client *golog.ApiClient // 日志服务
}
zap struct {
status bool // 状态
client *golog.ApiZapLog // 日志服务
}
}
// NewClient 创建实例化

@ -10,3 +10,12 @@ func (c *Client) ConfigApiClientFun(apiClientFun golog.ApiClientFun) {
c.log.status = true
}
}
// ConfigZapClientFun 日志配置
func (c *Client) ConfigZapClientFun(apiZapLogFun golog.ApiZapLogFun) {
apiZapLog := apiZapLogFun()
if apiZapLog != nil {
c.zap.client = apiZapLog
c.zap.status = true
}
}

@ -2,7 +2,6 @@ package douyin
import (
"context"
"github.com/dtapps/go-library"
"github.com/dtapps/go-library/utils/gorequest"
)
@ -34,7 +33,10 @@ func (c *Client) request(ctx context.Context, url string, params map[string]inte
// 记录日志
if c.log.status {
go c.log.client.Middleware(ctx, request, go_library.Version())
go c.log.client.Middleware(ctx, request)
}
if c.zap.status {
go c.zap.client.Middleware(ctx, request)
}
return request, err

@ -1,11 +0,0 @@
package gpcron
import (
"github.com/go-co-op/gocron"
)
// https://github.com/go-co-op/gocron
var (
Cron *gocron.Job
)

@ -1,19 +0,0 @@
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
# Test binary, built with `go test -c`
*.test
local_testing
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Dependency directories (remove the comment below to include it)
vendor/
# IDE project files
.idea

@ -1,50 +0,0 @@
run:
timeout: 2m
issues-exit-code: 1
tests: true
issues:
max-same-issues: 100
exclude-rules:
- path: _test\.go
linters:
- bodyclose
- errcheck
- gosec
linters:
enable:
- bodyclose
- errcheck
- gofmt
- gofumpt
- goimports
- gosec
- gosimple
- govet
- ineffassign
- misspell
- revive
- staticcheck
- typecheck
- unused
output:
# colored-line-number|line-number|json|tab|checkstyle|code-climate, default is "colored-line-number"
format: colored-line-number
# print lines of code with issue, default is true
print-issued-lines: true
# print linter name in the end of issue text, default is true
print-linter-name: true
# make issues output unique by line, default is true
uniq-by-line: true
# add a prefix to the output file references; default is no prefix
path-prefix: ""
# sorts results by: filepath, line and column
sort-results: true
linters-settings:
golint:
min-confidence: 0.8
fix: true

@ -1,73 +0,0 @@
# Contributor Covenant Code of Conduct
## Our Pledge
In the interest of fostering an open and welcoming environment, we as
contributors and maintainers pledge to making participation in our project and
our community a harassment-free experience for everyone. And we mean everyone!
## Our Standards
Examples of behavior that contributes to creating a positive environment
include:
* Using welcoming and kind language
* Being respectful of differing viewpoints and experiences
* Gracefully accepting constructive criticism
* Focusing on what is best for the community
* Showing empathy towards other community members
Examples of unacceptable behavior by participants include:
* The use of sexualized language or imagery and unwelcome sexual attention or
advances
* Trolling, insulting/derogatory comments, and personal or political attacks
* Public or private harassment
* Publishing others' private information, such as a physical or electronic
address, without explicit permission
* Other conduct which could reasonably be considered inappropriate in a
professional setting
## Our Responsibilities
Project maintainers are responsible for clarifying the standards of acceptable
behavior and are expected to take appropriate and fair corrective action in
response to any instances of unacceptable behavior.
Project maintainers have the right and responsibility to remove, edit, or
reject comments, commits, code, wiki edits, issues, and other contributions
that are not aligned to this Code of Conduct, or to ban temporarily or
permanently any contributor for other behaviors that they deem inappropriate,
threatening, offensive, or harmful.
## Scope
This Code of Conduct applies both within project spaces and in public spaces
when an individual is representing the project or its community. Examples of
representing a project or community include using an official project e-mail
address, posting via an official social media account, or acting as an appointed
representative at an online or offline event. Representation of a project may be
further defined and clarified by project maintainers.
## Enforcement
Instances of abusive, harassing, or otherwise unacceptable behavior may be
reported by contacting the project team initially on Slack to coordinate private communication. All
complaints will be reviewed and investigated and will result in a response that
is deemed necessary and appropriate to the circumstances. The project team is
obligated to maintain confidentiality with regard to the reporter of an incident.
Further details of specific enforcement policies may be posted separately.
Project maintainers who do not follow or enforce the Code of Conduct in good
faith may face temporary or permanent repercussions as determined by other
members of the project's leadership.
## Attribution
This Code of Conduct is adapted from the [Contributor Covenant][homepage], version 1.4,
available at https://www.contributor-covenant.org/version/1/4/code-of-conduct.html
[homepage]: https://www.contributor-covenant.org
For answers to common questions about this code of conduct, see
https://www.contributor-covenant.org/faq

@ -1,40 +0,0 @@
# Contributing to gocron
Thank you for coming to contribute to gocron! We welcome new ideas, PRs and general feedback.
## Reporting Bugs
If you find a bug then please let the project know by opening an issue after doing the following:
- Do a quick search of the existing issues to make sure the bug isn't already reported
- Try and make a minimal list of steps that can reliably reproduce the bug you are experiencing
- Collect as much information as you can to help identify what the issue is (project version, configuration files, etc)
## Suggesting Enhancements
If you have a use case that you don't see a way to support yet, we would welcome the feedback in an issue. Before opening the issue, please consider:
- Is this a common use case?
- Is it simple to understand?
You can help us out by doing the following before raising a new issue:
- Check that the feature hasn't been requested already by searching existing issues
- Try and reduce your enhancement into a single, concise and deliverable request, rather than a general idea
- Explain your own use cases as the basis of the request
## Adding Features
Pull requests are always welcome. However, before going through the trouble of implementing a change it's worth creating a bug or feature request issue.
This allows us to discuss the changes and make sure they are a good fit for the project.
Please always make sure a pull request has been:
- Unit tested with `make test`
- Linted with `make lint`
- Vetted with `make vet`
- Formatted with `make fmt` or validated with `make check-fmt`
## Writing Tests
Tests should follow the [table driven test pattern](https://dave.cheney.net/2013/06/09/writing-table-driven-tests-in-go). See other tests in the code base for additional examples.

@ -1,21 +0,0 @@
MIT License
Copyright (c) 2014, 辣椒面
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

@ -1,13 +0,0 @@
.PHONY: fmt check-fmt lint vet test
GO_PKGS := $(shell go list -f {{.Dir}} ./...)
fmt:
@go list -f {{.Dir}} ./... | xargs -I{} gofmt -w -s {}
lint:
@grep "^func " example_test.go | sort -c
@golangci-lint run
test:
@go test -race -v $(GO_FLAGS) -count=1 $(GO_PKGS)

@ -1,203 +0,0 @@
# gocron: A Golang Job Scheduling Package.
[![CI State](https://github.com/go-co-op/gocron/actions/workflows/go_test.yml/badge.svg?branch=main&event=push)](https://github.com/go-co-op/gocron/actions)
![Go Report Card](https://goreportcard.com/badge/github.com/go-co-op/gocron) [![Go Doc](https://godoc.org/github.com/go-co-op/gocron?status.svg)](https://pkg.go.dev/github.com/go-co-op/gocron)
gocron is a job scheduling package which lets you run Go functions at pre-determined intervals
using a simple, human-friendly syntax.
gocron is a Golang scheduler implementation similar to the Ruby module
[clockwork](https://github.com/tomykaira/clockwork) and the Python job scheduling package [schedule](https://github.com/dbader/schedule).
See also these two great articles that were used for design input:
- [Rethinking Cron](http://adam.herokuapp.com/past/2010/4/13/rethinking_cron/)
- [Replace Cron with Clockwork](http://adam.herokuapp.com/past/2010/6/30/replace_cron_with_clockwork/)
If you want to chat, you can find us at Slack!
[<img src="https://img.shields.io/badge/gophers-gocron-brightgreen?logo=slack">](https://gophers.slack.com/archives/CQ7T0T1FW)
## Concepts
- **Scheduler**: The scheduler tracks all the jobs assigned to it and makes sure they are passed to the executor when
ready to be run. The scheduler is able to manage overall aspects of job behavior like limiting how many jobs
are running at one time.
- **Job**: The job is simply aware of the task (go function) it's provided and is therefore only able to perform
actions related to that task like preventing itself from overruning a previous task that is taking a long time.
- **Executor**: The executor, as it's name suggests, is simply responsible for calling the task (go function) that
the job hands to it when sent by the scheduler.
## Examples
```golang
s := gocron.NewScheduler(time.UTC)
job, err := s.Every(5).Seconds().Do(func(){ ... })
if err != nil {
// handle the error related to setting up the job
}
// strings parse to duration
s.Every("5m").Do(func(){ ... })
s.Every(5).Days().Do(func(){ ... })
s.Every(1).Month(1, 2, 3).Do(func(){ ... })
// set time
s.Every(1).Day().At("10:30").Do(func(){ ... })
// set multiple times
s.Every(1).Day().At("10:30;08:00").Do(func(){ ... })
s.Every(1).Day().At("10:30").At("08:00").Do(func(){ ... })
// Schedule each last day of the month
s.Every(1).MonthLastDay().Do(func(){ ... })
// Or each last day of every other month
s.Every(2).MonthLastDay().Do(func(){ ... })
// cron expressions supported
s.Cron("*/1 * * * *").Do(task) // every minute
// cron second-level expressions supported
s.CronWithSeconds("*/1 * * * * *").Do(task) // every second
// you can start running the scheduler in two different ways:
// starts the scheduler asynchronously
s.StartAsync()
// starts the scheduler and blocks current execution path
s.StartBlocking()
```
For more examples, take a look in our [go docs](https://pkg.go.dev/github.com/go-co-op/gocron#pkg-examples)
## Options
| Interval | Supported schedule options |
| ------------ | ------------------------------------------------------------------- |
| sub-second | `StartAt()` |
| milliseconds | `StartAt()` |
| seconds | `StartAt()` |
| minutes | `StartAt()` |
| hours | `StartAt()` |
| days | `StartAt()`, `At()` |
| weeks | `StartAt()`, `At()`, `Weekday()` (and all week day named functions) |
| months | `StartAt()`, `At()` |
There are several options available to restrict how jobs run:
| Mode | Function | Behavior |
|---------------------|---------------------------|------------------------------------------------------------------------------------------------------|
| Default | | jobs are rescheduled at every interval |
| Job singleton | `SingletonMode()` | a long running job will not be rescheduled until the current run is completed |
| Scheduler limit | `SetMaxConcurrentJobs()` | set a collective maximum number of concurrent jobs running across the scheduler |
| Distributed locking | `WithDistributedLocker()` | prevents the same job from being run more than once when running multiple instances of the scheduler |
## Distributed Locker Implementations
- Redis: [redislock](https://github.com/go-co-op/gocron-redis-lock) `go get github.com/go-co-op/gocron-redis-lock`
## Tags
Jobs may have arbitrary tags added which can be useful when tracking many jobs.
The scheduler supports both enforcing tags to be unique and when not unique,
running all jobs with a given tag.
```golang
s := gocron.NewScheduler(time.UTC)
s.TagsUnique()
_, _ = s.Every(1).Week().Tag("foo").Do(task)
_, err := s.Every(1).Week().Tag("foo").Do(task)
// error!!!
s := gocron.NewScheduler(time.UTC)
s.Every(2).Day().Tag("tag").At("10:00").Do(task)
s.Every(1).Minute().Tag("tag").Do(task)
s.RunByTag("tag")
// both jobs will run
```
## FAQ
- Q: I'm running multiple pods on a distributed environment. How can I make a job not run once per pod causing duplication?
- We recommend using your own lock solution within the jobs themselves (you could use [Redis](https://redis.io/topics/distlock), for example)
- A2: Use the scheduler option `WithDistributedLocker` and either use an implemented [backend](#distributed-locker-implementations)
or implement your own and contribute it back in a PR!
- Q: I've removed my job from the scheduler, but how can I stop a long-running job that has already been triggered?
- A: We recommend using a means of canceling your job, e.g. a `context.WithCancel()`.
- A2: You can listen to the job context Done channel to know when the job has been canceled
```golang
task := func(in string, job gocron.Job) {
fmt.Printf("this job's last run: %s this job's next run: %s\n", job.LastRun(), job.NextRun())
fmt.Printf("in argument is %s\n", in)
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-job.Context().Done():
fmt.Printf("function has been canceled, performing cleanup and exiting gracefully\n")
return
case <-ticker.C:
fmt.Printf("performing a hard job that takes a long time that I want to kill whenever I want\n")
}
}
}
var err error
s := gocron.NewScheduler(time.UTC)
s.SingletonModeAll()
j, err := s.Every(1).Hour().Tag("myJob").DoWithJobDetails(task, "foo")
if err != nil {
log.Fatalln("error scheduling job", err)
}
s.StartAsync()
// Simulate some more work
time.Sleep(time.Second)
// I want to stop the job, together with the underlying goroutine
fmt.Printf("now I want to kill the job\n")
err = s.RemoveByTag("myJob")
if err != nil {
log.Fatalln("error removing job by tag", err)
}
// Wait a bit so that we can see that the job is exiting gracefully
time.Sleep(time.Second)
fmt.Printf("Job: %#v, Error: %#v", j, err)
```
---
Looking to contribute? Try to follow these guidelines:
- Use issues for everything
- For a small change, just send a PR!
- For bigger changes, please open an issue for discussion before sending a PR.
- PRs should have: tests, documentation and examples (if it makes sense)
- You can also contribute by:
- Reporting issues
- Suggesting new features or enhancements
- Improving/fixing documentation
---
## Design
![design-diagram](https://user-images.githubusercontent.com/19351306/110375142-2ba88680-8017-11eb-80c3-554cc746b165.png)
[Jetbrains](https://www.jetbrains.com/?from=gocron) supports this project with GoLand licenses. We appreciate their support for free and open source software!
## Star History
[![Star History Chart](https://api.star-history.com/svg?repos=go-co-op/gocron&type=Date)](https://star-history.com/#go-co-op/gocron&Date)

@ -1,15 +0,0 @@
# Security Policy
## Supported Versions
The current plan is to maintain version 1 as long as possible incorporating any necessary security patches.
| Version | Supported |
| ------- | ------------------ |
| 1.x.x | :white_check_mark: |
## Reporting a Vulnerability
Vulnerabilities can be reported by [opening an issue](https://github.com/go-co-op/gocron/issues/new/choose) or reaching out on Slack: [<img src="https://img.shields.io/badge/gophers-gocron-brightgreen?logo=slack">](https://gophers.slack.com/archives/CQ7T0T1FW)
We will do our best to addrerss any vulnerabilities in an expeditious manner.

@ -1,252 +0,0 @@
package gocron
import (
"context"
"sync"
"time"
"go.uber.org/atomic"
)
const (
// RescheduleMode - the default is that if a limit on maximum
// concurrent jobs is set and the limit is reached, a job will
// skip it's run and try again on the next occurrence in the schedule
RescheduleMode limitMode = iota
// WaitMode - if a limit on maximum concurrent jobs is set
// and the limit is reached, a job will wait to try and run
// until a spot in the limit is freed up.
//
// Note: this mode can produce unpredictable results as
// job execution order isn't guaranteed. For example, a job that
// executes frequently may pile up in the wait queue and be executed
// many times back to back when the queue opens.
//
// Warning: do not use this mode if your jobs will continue to stack
// up beyond the ability of the limit workers to keep up. An example of
// what NOT to do:
//
// s.Every("1s").Do(func() {
// // this will result in an ever-growing number of goroutines
// // blocked trying to send to the buffered channel
// time.Sleep(10 * time.Minute)
// })
WaitMode
)
type executor struct {
jobFunctions chan jobFunction // the chan upon which the jobFunctions are passed in from the scheduler
ctx context.Context // used to tell the executor to stop
cancel context.CancelFunc // used to tell the executor to stop
wg *sync.WaitGroup // used by the scheduler to wait for the executor to stop
jobsWg *sync.WaitGroup // used by the executor to wait for all jobs to finish
singletonWgs *sync.Map // used by the executor to wait for the singleton runners to complete
limitMode limitMode // when SetMaxConcurrentJobs() is set upon the scheduler
limitModeMaxRunningJobs int // stores the maximum number of concurrently running jobs
limitModeFuncsRunning *atomic.Int64 // tracks the count of limited mode funcs running
limitModeFuncWg *sync.WaitGroup // allow the executor to wait for limit mode functions to wrap up
limitModeQueue chan jobFunction // pass job functions to the limit mode workers
limitModeQueueMu *sync.Mutex // for protecting the limitModeQueue
limitModeRunningJobs *atomic.Int64 // tracks the count of running jobs to check against the max
stopped *atomic.Bool // allow workers to drain the buffered limitModeQueue
distributedLocker Locker // support running jobs across multiple instances
}
func newExecutor() executor {
e := executor{
jobFunctions: make(chan jobFunction, 1),
singletonWgs: &sync.Map{},
limitModeFuncsRunning: atomic.NewInt64(0),
limitModeFuncWg: &sync.WaitGroup{},
limitModeRunningJobs: atomic.NewInt64(0),
limitModeQueueMu: &sync.Mutex{},
}
return e
}
func runJob(f jobFunction) {
f.runStartCount.Add(1)
f.isRunning.Store(true)
callJobFunc(f.eventListeners.onBeforeJobExecution)
callJobFuncWithParams(f.function, f.parameters)
callJobFunc(f.eventListeners.onAfterJobExecution)
f.isRunning.Store(false)
f.runFinishCount.Add(1)
}
func (jf *jobFunction) singletonRunner() {
jf.singletonRunnerOn.Store(true)
jf.singletonWg.Add(1)
for {
select {
case <-jf.ctx.Done():
jf.singletonWg.Done()
jf.singletonRunnerOn.Store(false)
jf.singletonQueue = make(chan struct{}, 1000)
jf.stopped.Store(false)
return
case <-jf.singletonQueue:
if !jf.stopped.Load() {
runJob(*jf)
}
}
}
}
func (e *executor) limitModeRunner() {
for {
select {
case <-e.ctx.Done():
e.limitModeFuncsRunning.Inc()
e.limitModeFuncWg.Done()
return
case jf := <-e.limitModeQueue:
if !e.stopped.Load() {
e.runJob(jf)
}
}
}
}
func (e *executor) start() {
e.wg = &sync.WaitGroup{}
e.wg.Add(1)
stopCtx, cancel := context.WithCancel(context.Background())
e.ctx = stopCtx
e.cancel = cancel
e.jobsWg = &sync.WaitGroup{}
e.stopped = atomic.NewBool(false)
e.limitModeQueueMu.Lock()
e.limitModeQueue = make(chan jobFunction, 1000)
e.limitModeQueueMu.Unlock()
go e.run()
}
func (e *executor) runJob(f jobFunction) {
switch f.runConfig.mode {
case defaultMode:
lockKey := f.jobName
if lockKey == "" {
lockKey = f.funcName
}
if e.distributedLocker != nil {
l, err := e.distributedLocker.Lock(f.ctx, lockKey)
if err != nil || l == nil {
return
}
defer func() {
durationToNextRun := time.Until(f.jobFuncNextRun)
if durationToNextRun > time.Second*5 {
durationToNextRun = time.Second * 5
}
if durationToNextRun > time.Millisecond*100 {
timeToSleep := time.Duration(float64(durationToNextRun) * 0.9)
select {
case <-e.ctx.Done():
case <-time.After(timeToSleep):
}
}
_ = l.Unlock(f.ctx)
}()
}
runJob(f)
case singletonMode:
e.singletonWgs.Store(f.singletonWg, struct{}{})
if !f.singletonRunnerOn.Load() {
go f.singletonRunner()
}
f.singletonQueue <- struct{}{}
}
}
func (e *executor) run() {
for {
select {
case f := <-e.jobFunctions:
if e.stopped.Load() {
continue
}
if e.limitModeMaxRunningJobs > 0 {
countRunning := e.limitModeFuncsRunning.Load()
if countRunning < int64(e.limitModeMaxRunningJobs) {
diff := int64(e.limitModeMaxRunningJobs) - countRunning
for i := int64(0); i < diff; i++ {
e.limitModeFuncWg.Add(1)
go e.limitModeRunner()
e.limitModeFuncsRunning.Inc()
}
}
}
e.jobsWg.Add(1)
go func() {
defer e.jobsWg.Done()
panicHandlerMutex.RLock()
defer panicHandlerMutex.RUnlock()
if panicHandler != nil {
defer func() {
if r := recover(); r != nil {
panicHandler(f.funcName, r)
}
}()
}
if e.limitModeMaxRunningJobs > 0 {
switch e.limitMode {
case RescheduleMode:
if e.limitModeRunningJobs.Load() < int64(e.limitModeMaxRunningJobs) {
select {
case e.limitModeQueue <- f:
case <-e.ctx.Done():
}
}
case WaitMode:
select {
case e.limitModeQueue <- f:
case <-e.ctx.Done():
}
}
return
}
e.runJob(f)
}()
case <-e.ctx.Done():
e.jobsWg.Wait()
e.wg.Done()
return
}
}
}
func (e *executor) stop() {
e.stopped.Store(true)
e.cancel()
e.wg.Wait()
if e.singletonWgs != nil {
e.singletonWgs.Range(func(key, value interface{}) bool {
if wg, ok := key.(*sync.WaitGroup); ok {
wg.Wait()
}
return true
})
}
if e.limitModeMaxRunningJobs > 0 {
e.limitModeFuncWg.Wait()
e.limitModeQueueMu.Lock()
e.limitModeQueue = nil
e.limitModeQueueMu.Unlock()
}
}

@ -1,129 +0,0 @@
// Package gocron : A Golang Job Scheduling Package.
//
// An in-process scheduler for periodic jobs that uses the builder pattern
// for configuration. gocron lets you run Golang functions periodically
// at pre-determined intervals using a simple, human-friendly syntax.
package gocron
import (
"errors"
"fmt"
"reflect"
"regexp"
"runtime"
"sync"
"time"
)
// PanicHandlerFunc represents a type that can be set to handle panics occurring
// during job execution.
type PanicHandlerFunc func(jobName string, recoverData interface{})
// The global panic handler
var (
panicHandler PanicHandlerFunc
panicHandlerMutex = sync.RWMutex{}
)
// SetPanicHandler sets the global panicHandler to the given function.
// Leaving it nil or setting it to nil disables automatic panic handling.
// If the panicHandler is not nil, any panic that occurs during executing a job will be recovered
// and the panicHandlerFunc will be called with the job's funcName and the recover data.
func SetPanicHandler(handler PanicHandlerFunc) {
panicHandlerMutex.Lock()
defer panicHandlerMutex.Unlock()
panicHandler = handler
}
// Error declarations for gocron related errors
var (
ErrNotAFunction = errors.New("gocron: only functions can be scheduled into the job queue")
ErrNotScheduledWeekday = errors.New("gocron: job not scheduled weekly on a weekday")
ErrJobNotFoundWithTag = errors.New("gocron: no jobs found with given tag")
ErrUnsupportedTimeFormat = errors.New("gocron: the given time format is not supported")
ErrInvalidInterval = errors.New("gocron: .Every() interval must be greater than 0")
ErrInvalidIntervalType = errors.New("gocron: .Every() interval must be int, time.Duration, or string")
ErrInvalidIntervalUnitsSelection = errors.New("gocron: .Every(time.Duration) and .Cron() cannot be used with units (e.g. .Seconds())")
ErrInvalidFunctionParameters = errors.New("gocron: length of function parameters must match job function parameters")
ErrAtTimeNotSupported = errors.New("gocron: the At() method is not supported for this time unit")
ErrWeekdayNotSupported = errors.New("gocron: weekday is not supported for time unit")
ErrInvalidDayOfMonthEntry = errors.New("gocron: only days 1 through 28 are allowed for monthly schedules")
ErrTagsUnique = func(tag string) error { return fmt.Errorf("gocron: a non-unique tag was set on the job: %s", tag) }
ErrWrongParams = errors.New("gocron: wrong list of params")
ErrDoWithJobDetails = errors.New("gocron: DoWithJobDetails expects a function whose last parameter is a gocron.Job")
ErrUpdateCalledWithoutJob = errors.New("gocron: a call to Scheduler.Update() requires a call to Scheduler.Job() first")
ErrCronParseFailure = errors.New("gocron: cron expression failed to be parsed")
ErrInvalidDaysOfMonthDuplicateValue = errors.New("gocron: duplicate days of month is not allowed in Month() and Months() methods")
)
func wrapOrError(toWrap error, err error) error {
var returnErr error
if toWrap != nil && !errors.Is(err, toWrap) {
returnErr = fmt.Errorf("%s: %w", err, toWrap)
} else {
returnErr = err
}
return returnErr
}
// regex patterns for supported time formats
var (
timeWithSeconds = regexp.MustCompile(`(?m)^\d{1,2}:\d\d:\d\d$`)
timeWithoutSeconds = regexp.MustCompile(`(?m)^\d{1,2}:\d\d$`)
)
type schedulingUnit int
const (
// default unit is seconds
milliseconds schedulingUnit = iota
seconds
minutes
hours
days
weeks
months
duration
crontab
)
func callJobFunc(jobFunc interface{}) {
if jobFunc != nil {
reflect.ValueOf(jobFunc).Call([]reflect.Value{})
}
}
func callJobFuncWithParams(jobFunc interface{}, params []interface{}) {
f := reflect.ValueOf(jobFunc)
if len(params) != f.Type().NumIn() {
return
}
in := make([]reflect.Value, len(params))
for k, param := range params {
in[k] = reflect.ValueOf(param)
}
f.Call(in)
}
func getFunctionName(fn interface{}) string {
return runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()
}
func parseTime(t string) (hour, min, sec int, err error) {
var timeLayout string
switch {
case timeWithSeconds.Match([]byte(t)):
timeLayout = "15:04:05"
case timeWithoutSeconds.Match([]byte(t)):
timeLayout = "15:04"
default:
return 0, 0, 0, ErrUnsupportedTimeFormat
}
parsedTime, err := time.Parse(timeLayout, t)
if err != nil {
return 0, 0, 0, ErrUnsupportedTimeFormat
}
return parsedTime.Hour(), parsedTime.Minute(), parsedTime.Second(), nil
}

@ -1,508 +0,0 @@
package gocron
import (
"context"
"fmt"
"math/rand"
"sort"
"sync"
"time"
"github.com/robfig/cron/v3"
"go.uber.org/atomic"
)
// Job struct stores the information necessary to run a Job
type Job struct {
mu *jobMutex
jobFunction
interval int // interval * unit between runs
random // details for randomness
duration time.Duration // time duration between runs
unit schedulingUnit // time units, e.g. 'minutes', 'hours'...
startsImmediately bool // if the Job should run upon scheduler start
atTimes []time.Duration // optional time(s) at which this Job runs when interval is day
startAtTime time.Time // optional time at which the Job starts
error error // error related to Job
lastRun time.Time // datetime of last run
nextRun time.Time // datetime of next run
scheduledWeekdays []time.Weekday // Specific days of the week to start on
daysOfTheMonth []int // Specific days of the month to run the job
tags []string // allow the user to tag Jobs with certain labels
timer *time.Timer // handles running tasks at specific time
cronSchedule cron.Schedule // stores the schedule when a task uses cron
runWithDetails bool // when true the job is passed as the last arg of the jobFunc
}
type random struct {
rand *rand.Rand
randomizeInterval bool // whether the interval is random
randomIntervalRange [2]int // random interval range
}
type jobFunction struct {
eventListeners // additional functions to allow run 'em during job performing
function interface{} // task's function
parameters []interface{} // task's function parameters
parametersLen int // length of the passed parameters
jobName string // key of the distributed lock
funcName string // the name of the function - e.g. main.func1
runConfig runConfig // configuration for how many times to run the job
singletonQueue chan struct{} // queues jobs for the singleton runner to handle
singletonRunnerOn *atomic.Bool // whether the runner function for singleton is running
ctx context.Context // for cancellation
cancel context.CancelFunc // for cancellation
isRunning *atomic.Bool // whether the job func is currently being run
runStartCount *atomic.Int64 // number of times the job was started
runFinishCount *atomic.Int64 // number of times the job was finished
singletonWg *sync.WaitGroup // used by singleton runner
stopped *atomic.Bool // tracks whether the job is currently stopped
jobFuncNextRun time.Time // the next time the job is scheduled to run
}
type eventListeners struct {
onBeforeJobExecution interface{} // performs before job executing
onAfterJobExecution interface{} // performs after job executing
}
type jobMutex struct {
sync.RWMutex
}
func (jf *jobFunction) copy() jobFunction {
cp := jobFunction{
eventListeners: jf.eventListeners,
function: jf.function,
parameters: nil,
parametersLen: jf.parametersLen,
funcName: jf.funcName,
jobName: jf.jobName,
runConfig: jf.runConfig,
singletonQueue: jf.singletonQueue,
ctx: jf.ctx,
cancel: jf.cancel,
isRunning: jf.isRunning,
runStartCount: jf.runStartCount,
runFinishCount: jf.runFinishCount,
singletonWg: jf.singletonWg,
singletonRunnerOn: jf.singletonRunnerOn,
stopped: jf.stopped,
jobFuncNextRun: jf.jobFuncNextRun,
}
cp.parameters = append(cp.parameters, jf.parameters...)
return cp
}
type runConfig struct {
finiteRuns bool
maxRuns int
mode mode
}
// mode is the Job's running mode
type mode int8
const (
// defaultMode disable any mode
defaultMode mode = iota
// singletonMode switch to single job mode
singletonMode
)
// newJob creates a new Job with the provided interval
func newJob(interval int, startImmediately bool, singletonMode bool) *Job {
ctx, cancel := context.WithCancel(context.Background())
job := &Job{
mu: &jobMutex{},
interval: interval,
unit: seconds,
lastRun: time.Time{},
nextRun: time.Time{},
jobFunction: jobFunction{
ctx: ctx,
cancel: cancel,
isRunning: atomic.NewBool(false),
runStartCount: atomic.NewInt64(0),
runFinishCount: atomic.NewInt64(0),
singletonRunnerOn: atomic.NewBool(false),
stopped: atomic.NewBool(false),
},
tags: []string{},
startsImmediately: startImmediately,
}
if singletonMode {
job.SingletonMode()
}
return job
}
// Name sets the name of the current job.
//
// If the scheduler is running using WithDistributedLocker(),
// the job name is used as the distributed lock key.
func (j *Job) Name(name string) {
j.mu.Lock()
defer j.mu.Unlock()
j.jobName = name
}
func (j *Job) setRandomInterval(a, b int) {
j.random.rand = rand.New(rand.NewSource(time.Now().UnixNano())) // nolint
j.random.randomizeInterval = true
if a < b {
j.random.randomIntervalRange[0] = a
j.random.randomIntervalRange[1] = b + 1
} else {
j.random.randomIntervalRange[0] = b
j.random.randomIntervalRange[1] = a + 1
}
}
func (j *Job) getRandomInterval() int {
randNum := j.rand.Intn(j.randomIntervalRange[1] - j.randomIntervalRange[0])
return j.randomIntervalRange[0] + randNum
}
func (j *Job) getInterval() int {
if j.randomizeInterval {
return j.getRandomInterval()
}
return j.interval
}
func (j *Job) neverRan() bool {
jobLastRun := j.LastRun()
return jobLastRun.IsZero()
}
func (j *Job) getStartsImmediately() bool {
return j.startsImmediately
}
func (j *Job) setStartsImmediately(b bool) {
j.startsImmediately = b
}
func (j *Job) setTimer(t *time.Timer) {
j.mu.Lock()
defer j.mu.Unlock()
j.timer = t
}
func (j *Job) getFirstAtTime() time.Duration {
var t time.Duration
if len(j.atTimes) > 0 {
t = j.atTimes[0]
}
return t
}
func (j *Job) getAtTime(lastRun time.Time) time.Duration {
var r time.Duration
if len(j.atTimes) == 0 {
return r
}
if len(j.atTimes) == 1 {
return j.atTimes[0]
}
if lastRun.IsZero() {
r = j.atTimes[0]
} else {
for _, d := range j.atTimes {
nt := time.Date(lastRun.Year(), lastRun.Month(), lastRun.Day(), 0, 0, 0, 0, lastRun.Location()).Add(d)
if nt.After(lastRun) {
r = d
break
}
}
}
return r
}
func (j *Job) addAtTime(t time.Duration) {
if len(j.atTimes) == 0 {
j.atTimes = append(j.atTimes, t)
return
}
exist := false
index := sort.Search(len(j.atTimes), func(i int) bool {
atTime := j.atTimes[i]
b := atTime >= t
if b {
exist = atTime == t
}
return b
})
// ignore if present
if exist {
return
}
j.atTimes = append(j.atTimes, time.Duration(0))
copy(j.atTimes[index+1:], j.atTimes[index:])
j.atTimes[index] = t
}
func (j *Job) getStartAtTime() time.Time {
return j.startAtTime
}
func (j *Job) setStartAtTime(t time.Time) {
j.startAtTime = t
}
func (j *Job) getUnit() schedulingUnit {
j.mu.RLock()
defer j.mu.RUnlock()
return j.unit
}
func (j *Job) setUnit(t schedulingUnit) {
j.mu.Lock()
defer j.mu.Unlock()
j.unit = t
}
func (j *Job) getDuration() time.Duration {
j.mu.RLock()
defer j.mu.RUnlock()
return j.duration
}
func (j *Job) setDuration(t time.Duration) {
j.mu.Lock()
defer j.mu.Unlock()
j.duration = t
}
// hasTags returns true if all tags are matched on this Job
func (j *Job) hasTags(tags ...string) bool {
// Build map of all Job tags for easy comparison
jobTags := map[string]int{}
for _, tag := range j.tags {
jobTags[tag] = 0
}
// Loop through required tags and if one doesn't exist, return false
for _, tag := range tags {
_, ok := jobTags[tag]
if !ok {
return false
}
}
return true
}
// Error returns an error if one occurred while creating the Job.
// If multiple errors occurred, they will be wrapped and can be
// checked using the standard unwrap options.
func (j *Job) Error() error {
return j.error
}
// Context returns the job's context. The context controls cancellation.
func (j *Job) Context() context.Context {
return j.ctx
}
// Tag allows you to add arbitrary labels to a Job that do not
// impact the functionality of the Job
func (j *Job) Tag(tags ...string) {
j.tags = append(j.tags, tags...)
}
// Untag removes a tag from a Job
func (j *Job) Untag(t string) {
var newTags []string
for _, tag := range j.tags {
if t != tag {
newTags = append(newTags, tag)
}
}
j.tags = newTags
}
// Tags returns the tags attached to the Job
func (j *Job) Tags() []string {
return j.tags
}
// SetEventListeners accepts two functions that will be called, one before and one after the job is run
func (j *Job) SetEventListeners(onBeforeJobExecution interface{}, onAfterJobExecution interface{}) {
j.eventListeners = eventListeners{
onBeforeJobExecution: onBeforeJobExecution,
onAfterJobExecution: onAfterJobExecution,
}
}
// ScheduledTime returns the time of the Job's next scheduled run
func (j *Job) ScheduledTime() time.Time {
j.mu.RLock()
defer j.mu.RUnlock()
return j.nextRun
}
// ScheduledAtTime returns the specific time of day the Job will run at.
// If multiple times are set, the earliest time will be returned.
func (j *Job) ScheduledAtTime() string {
if len(j.atTimes) == 0 {
return "00:00"
}
return fmt.Sprintf("%02d:%02d", j.getFirstAtTime()/time.Hour, (j.getFirstAtTime()%time.Hour)/time.Minute)
}
// ScheduledAtTimes returns the specific times of day the Job will run at
func (j *Job) ScheduledAtTimes() []string {
r := make([]string, len(j.atTimes))
for i, t := range j.atTimes {
r[i] = fmt.Sprintf("%02d:%02d", t/time.Hour, (t%time.Hour)/time.Minute)
}
return r
}
// Weekday returns which day of the week the Job will run on and
// will return an error if the Job is not scheduled weekly
func (j *Job) Weekday() (time.Weekday, error) {
if len(j.scheduledWeekdays) == 0 {
return time.Sunday, ErrNotScheduledWeekday
}
return j.scheduledWeekdays[0], nil
}
// Weekdays returns a slice of time.Weekday that the Job will run in a week and
// will return an error if the Job is not scheduled weekly
func (j *Job) Weekdays() []time.Weekday {
// appending on j.scheduledWeekdays may cause a side effect
if len(j.scheduledWeekdays) == 0 {
return []time.Weekday{time.Sunday}
}
return j.scheduledWeekdays
}
// LimitRunsTo limits the number of executions of this job to n.
// Upon reaching the limit, the job is removed from the scheduler.
//
// Note: If a job is added to a running scheduler and this method is then used
// you may see the job run more than the set limit as job is scheduled immediately
// by default upon being added to the scheduler. It is recommended to use the
// LimitRunsTo() func on the scheduler chain when scheduling the job.
// For example: scheduler.LimitRunsTo(1).Do()
func (j *Job) LimitRunsTo(n int) {
j.mu.Lock()
defer j.mu.Unlock()
j.runConfig.finiteRuns = true
j.runConfig.maxRuns = n
}
// SingletonMode prevents a new job from starting if the prior job has not yet
// completed it's run
// Note: If a job is added to a running scheduler and this method is then used
// you may see the job run overrun itself as job is scheduled immediately
// by default upon being added to the scheduler. It is recommended to use the
// SingletonMode() func on the scheduler chain when scheduling the job.
func (j *Job) SingletonMode() {
j.mu.Lock()
defer j.mu.Unlock()
j.runConfig.mode = singletonMode
j.jobFunction.singletonWg = &sync.WaitGroup{}
j.jobFunction.singletonQueue = make(chan struct{}, 100)
}
// shouldRun evaluates if this job should run again
// based on the runConfig
func (j *Job) shouldRun() bool {
j.mu.RLock()
defer j.mu.RUnlock()
return !j.runConfig.finiteRuns || j.runStartCount.Load() < int64(j.runConfig.maxRuns)
}
// LastRun returns the time the job was run last
func (j *Job) LastRun() time.Time {
j.mu.RLock()
defer j.mu.RUnlock()
return j.lastRun
}
func (j *Job) setLastRun(t time.Time) {
j.lastRun = t
}
// NextRun returns the time the job will run next
func (j *Job) NextRun() time.Time {
j.mu.RLock()
defer j.mu.RUnlock()
return j.nextRun
}
func (j *Job) setNextRun(t time.Time) {
j.mu.Lock()
defer j.mu.Unlock()
j.nextRun = t
j.jobFunction.jobFuncNextRun = t
}
// RunCount returns the number of times the job has been started
func (j *Job) RunCount() int {
j.mu.Lock()
defer j.mu.Unlock()
return int(j.runStartCount.Load())
}
// FinishedRunCount returns the number of times the job has finished running
func (j *Job) FinishedRunCount() int {
j.mu.Lock()
defer j.mu.Unlock()
return int(j.runFinishCount.Load())
}
func (j *Job) stop() {
j.mu.Lock()
defer j.mu.Unlock()
if j.timer != nil {
j.timer.Stop()
}
if j.cancel != nil {
j.cancel()
j.ctx, j.cancel = context.WithCancel(context.Background())
}
j.stopped.Store(true)
}
// IsRunning reports whether any instances of the job function are currently running
func (j *Job) IsRunning() bool {
return j.isRunning.Load()
}
// you must Lock the job before calling copy
func (j *Job) copy() Job {
return Job{
mu: &jobMutex{},
jobFunction: j.jobFunction,
interval: j.interval,
duration: j.duration,
unit: j.unit,
startsImmediately: j.startsImmediately,
atTimes: j.atTimes,
startAtTime: j.startAtTime,
error: j.error,
lastRun: j.lastRun,
nextRun: j.nextRun,
scheduledWeekdays: j.scheduledWeekdays,
daysOfTheMonth: j.daysOfTheMonth,
tags: j.tags,
timer: j.timer,
cronSchedule: j.cronSchedule,
runWithDetails: j.runWithDetails,
}
}

@ -1,23 +0,0 @@
package gocron
import (
"context"
"errors"
)
var (
ErrFailedToConnectToRedis = errors.New("gocron: failed to connect to redis")
ErrFailedToObtainLock = errors.New("gocron: failed to obtain lock")
ErrFailedToReleaseLock = errors.New("gocron: failed to release lock")
)
// Locker represents the required interface to lock jobs when running multiple schedulers.
type Locker interface {
// Lock if an error is returned by lock, the job will not be scheduled.
Lock(ctx context.Context, key string) (Lock, error)
}
// Lock represents an obtained lock
type Lock interface {
Unlock(ctx context.Context) error
}

File diff suppressed because it is too large Load Diff

@ -1,33 +0,0 @@
package gocron
import "time"
var _ TimeWrapper = (*trueTime)(nil)
// TimeWrapper is an interface that wraps the Now, Sleep, and Unix methods of the time package.
// This allows the library and users to mock the time package for testing.
type TimeWrapper interface {
Now(*time.Location) time.Time
Unix(int64, int64) time.Time
Sleep(time.Duration)
}
type trueTime struct{}
func (t *trueTime) Now(location *time.Location) time.Time {
return time.Now().In(location)
}
func (t *trueTime) Unix(sec int64, nsec int64) time.Time {
return time.Unix(sec, nsec)
}
func (t *trueTime) Sleep(d time.Duration) {
time.Sleep(d)
}
// afterFunc proxies the time.AfterFunc function.
// This allows it to be mocked for testing.
func afterFunc(d time.Duration, f func()) *time.Timer {
return time.AfterFunc(d, f)
}

@ -75,9 +75,6 @@ github.com/gin-gonic/gin/binding
github.com/gin-gonic/gin/internal/bytesconv
github.com/gin-gonic/gin/internal/json
github.com/gin-gonic/gin/render
# github.com/go-co-op/gocron v1.27.0
## explicit; go 1.16
github.com/go-co-op/gocron
# github.com/go-logr/logr v1.2.4
## explicit; go 1.16
github.com/go-logr/logr
@ -292,6 +289,8 @@ github.com/redis/go-redis/v9/internal/util
# github.com/robfig/cron/v3 v3.0.1
## explicit; go 1.12
github.com/robfig/cron/v3
# github.com/rogpeppe/go-internal v1.8.1
## explicit; go 1.16
# github.com/saracen/go7z v0.0.0-20191010121135-9c09b6bd7fda
## explicit
github.com/saracen/go7z

Loading…
Cancel
Save