diff --git a/go.mod b/go.mod index 61dd3b21..850537ae 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,6 @@ require ( github.com/basgys/goxml2json v1.1.0 github.com/bytedance/sonic v1.8.8 github.com/gin-gonic/gin v1.9.0 - github.com/go-co-op/gocron v1.27.0 github.com/go-playground/locales v0.14.1 github.com/go-playground/universal-translator v0.18.1 github.com/go-playground/validator/v10 v10.13.0 @@ -78,6 +77,7 @@ require ( github.com/oschwald/maxminddb-golang v1.10.0 // indirect github.com/pelletier/go-toml/v2 v2.0.7 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/rogpeppe/go-internal v1.8.1 // indirect github.com/saracen/go7z-fixtures v0.0.0-20190623165746-aa6b8fba1d2f // indirect github.com/saracen/solidblock v0.0.0-20190426153529-45df20abab6f // indirect github.com/syndtr/goleveldb v1.0.0 // indirect diff --git a/go.sum b/go.sum index ca0ab131..442bd045 100644 --- a/go.sum +++ b/go.sum @@ -106,8 +106,6 @@ github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= github.com/gin-gonic/gin v1.9.0 h1:OjyFBKICoexlu99ctXNR2gg+c5pKrKMuyjgARg9qeY8= github.com/gin-gonic/gin v1.9.0/go.mod h1:W1Me9+hsUSyj3CePGrd1/QrKJMSJ1Tu/0hFEH89961k= -github.com/go-co-op/gocron v1.27.0 h1:GbP9A0wauIeGbMCUzdGb2IAi1JHzNHT/H/lLW2ODwLE= -github.com/go-co-op/gocron v1.27.0/go.mod h1:39f6KNSGVOU1LO/ZOoZfcSxwlsJDQOKSu8erN0SH48Y= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/kit v0.10.0/go.mod h1:xUsJbQ/Fp4kEt7AFgCuvyX4a71u8h9jB8tj/ORgOZ7o= @@ -576,7 +574,6 @@ go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= -go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= diff --git a/library.go b/library.go index dc533cee..3abbd84d 100644 --- a/library.go +++ b/library.go @@ -1,5 +1,5 @@ package go_library func Version() string { - return "1.0.145" + return "1.0.146" } diff --git a/service/douyin/client.go b/service/douyin/client.go index b419c718..6b94bdfe 100644 --- a/service/douyin/client.go +++ b/service/douyin/client.go @@ -18,6 +18,10 @@ type Client struct { status bool // 状态 client *golog.ApiClient // 日志服务 } + zap struct { + status bool // 状态 + client *golog.ApiZapLog // 日志服务 + } } // NewClient 创建实例化 diff --git a/service/douyin/config.go b/service/douyin/config.go index 7eb09ad6..644f2b92 100644 --- a/service/douyin/config.go +++ b/service/douyin/config.go @@ -10,3 +10,12 @@ func (c *Client) ConfigApiClientFun(apiClientFun golog.ApiClientFun) { c.log.status = true } } + +// ConfigZapClientFun 日志配置 +func (c *Client) ConfigZapClientFun(apiZapLogFun golog.ApiZapLogFun) { + apiZapLog := apiZapLogFun() + if apiZapLog != nil { + c.zap.client = apiZapLog + c.zap.status = true + } +} diff --git a/service/douyin/request.go b/service/douyin/request.go index eaa2fd7c..14744741 100644 --- a/service/douyin/request.go +++ b/service/douyin/request.go @@ -2,7 +2,6 @@ package douyin import ( "context" - "github.com/dtapps/go-library" "github.com/dtapps/go-library/utils/gorequest" ) @@ -34,7 +33,10 @@ func (c *Client) request(ctx context.Context, url string, params map[string]inte // 记录日志 if c.log.status { - go c.log.client.Middleware(ctx, request, go_library.Version()) + go c.log.client.Middleware(ctx, request) + } + if c.zap.status { + go c.zap.client.Middleware(ctx, request) } return request, err diff --git a/utils/gpcron/gpcron.go b/utils/gpcron/gpcron.go deleted file mode 100644 index b43850c8..00000000 --- a/utils/gpcron/gpcron.go +++ /dev/null @@ -1,11 +0,0 @@ -package gpcron - -import ( - "github.com/go-co-op/gocron" -) - -// https://github.com/go-co-op/gocron - -var ( - Cron *gocron.Job -) diff --git a/vendor/github.com/go-co-op/gocron/.gitignore b/vendor/github.com/go-co-op/gocron/.gitignore deleted file mode 100644 index f6409f90..00000000 --- a/vendor/github.com/go-co-op/gocron/.gitignore +++ /dev/null @@ -1,19 +0,0 @@ -# Binaries for programs and plugins -*.exe -*.exe~ -*.dll -*.so -*.dylib - -# Test binary, built with `go test -c` -*.test -local_testing - -# Output of the go coverage tool, specifically when used with LiteIDE -*.out - -# Dependency directories (remove the comment below to include it) -vendor/ - -# IDE project files -.idea diff --git a/vendor/github.com/go-co-op/gocron/.golangci.yaml b/vendor/github.com/go-co-op/gocron/.golangci.yaml deleted file mode 100644 index 7c86335f..00000000 --- a/vendor/github.com/go-co-op/gocron/.golangci.yaml +++ /dev/null @@ -1,50 +0,0 @@ -run: - timeout: 2m - issues-exit-code: 1 - tests: true - -issues: - max-same-issues: 100 - exclude-rules: - - path: _test\.go - linters: - - bodyclose - - errcheck - - gosec - -linters: - enable: - - bodyclose - - errcheck - - gofmt - - gofumpt - - goimports - - gosec - - gosimple - - govet - - ineffassign - - misspell - - revive - - staticcheck - - typecheck - - unused - -output: - # colored-line-number|line-number|json|tab|checkstyle|code-climate, default is "colored-line-number" - format: colored-line-number - # print lines of code with issue, default is true - print-issued-lines: true - # print linter name in the end of issue text, default is true - print-linter-name: true - # make issues output unique by line, default is true - uniq-by-line: true - # add a prefix to the output file references; default is no prefix - path-prefix: "" - # sorts results by: filepath, line and column - sort-results: true - -linters-settings: - golint: - min-confidence: 0.8 - -fix: true diff --git a/vendor/github.com/go-co-op/gocron/CODE_OF_CONDUCT.md b/vendor/github.com/go-co-op/gocron/CODE_OF_CONDUCT.md deleted file mode 100644 index 7d913b55..00000000 --- a/vendor/github.com/go-co-op/gocron/CODE_OF_CONDUCT.md +++ /dev/null @@ -1,73 +0,0 @@ -# Contributor Covenant Code of Conduct - -## Our Pledge - -In the interest of fostering an open and welcoming environment, we as -contributors and maintainers pledge to making participation in our project and -our community a harassment-free experience for everyone. And we mean everyone! - -## Our Standards - -Examples of behavior that contributes to creating a positive environment -include: - -* Using welcoming and kind language -* Being respectful of differing viewpoints and experiences -* Gracefully accepting constructive criticism -* Focusing on what is best for the community -* Showing empathy towards other community members - -Examples of unacceptable behavior by participants include: - -* The use of sexualized language or imagery and unwelcome sexual attention or - advances -* Trolling, insulting/derogatory comments, and personal or political attacks -* Public or private harassment -* Publishing others' private information, such as a physical or electronic - address, without explicit permission -* Other conduct which could reasonably be considered inappropriate in a - professional setting - -## Our Responsibilities - -Project maintainers are responsible for clarifying the standards of acceptable -behavior and are expected to take appropriate and fair corrective action in -response to any instances of unacceptable behavior. - -Project maintainers have the right and responsibility to remove, edit, or -reject comments, commits, code, wiki edits, issues, and other contributions -that are not aligned to this Code of Conduct, or to ban temporarily or -permanently any contributor for other behaviors that they deem inappropriate, -threatening, offensive, or harmful. - -## Scope - -This Code of Conduct applies both within project spaces and in public spaces -when an individual is representing the project or its community. Examples of -representing a project or community include using an official project e-mail -address, posting via an official social media account, or acting as an appointed -representative at an online or offline event. Representation of a project may be -further defined and clarified by project maintainers. - -## Enforcement - -Instances of abusive, harassing, or otherwise unacceptable behavior may be -reported by contacting the project team initially on Slack to coordinate private communication. All -complaints will be reviewed and investigated and will result in a response that -is deemed necessary and appropriate to the circumstances. The project team is -obligated to maintain confidentiality with regard to the reporter of an incident. -Further details of specific enforcement policies may be posted separately. - -Project maintainers who do not follow or enforce the Code of Conduct in good -faith may face temporary or permanent repercussions as determined by other -members of the project's leadership. - -## Attribution - -This Code of Conduct is adapted from the [Contributor Covenant][homepage], version 1.4, -available at https://www.contributor-covenant.org/version/1/4/code-of-conduct.html - -[homepage]: https://www.contributor-covenant.org - -For answers to common questions about this code of conduct, see -https://www.contributor-covenant.org/faq diff --git a/vendor/github.com/go-co-op/gocron/CONTRIBUTING.md b/vendor/github.com/go-co-op/gocron/CONTRIBUTING.md deleted file mode 100644 index b2d3be83..00000000 --- a/vendor/github.com/go-co-op/gocron/CONTRIBUTING.md +++ /dev/null @@ -1,40 +0,0 @@ -# Contributing to gocron - -Thank you for coming to contribute to gocron! We welcome new ideas, PRs and general feedback. - -## Reporting Bugs - -If you find a bug then please let the project know by opening an issue after doing the following: - -- Do a quick search of the existing issues to make sure the bug isn't already reported -- Try and make a minimal list of steps that can reliably reproduce the bug you are experiencing -- Collect as much information as you can to help identify what the issue is (project version, configuration files, etc) - -## Suggesting Enhancements - -If you have a use case that you don't see a way to support yet, we would welcome the feedback in an issue. Before opening the issue, please consider: - -- Is this a common use case? -- Is it simple to understand? - -You can help us out by doing the following before raising a new issue: - -- Check that the feature hasn't been requested already by searching existing issues -- Try and reduce your enhancement into a single, concise and deliverable request, rather than a general idea -- Explain your own use cases as the basis of the request - -## Adding Features - -Pull requests are always welcome. However, before going through the trouble of implementing a change it's worth creating a bug or feature request issue. -This allows us to discuss the changes and make sure they are a good fit for the project. - -Please always make sure a pull request has been: - -- Unit tested with `make test` -- Linted with `make lint` -- Vetted with `make vet` -- Formatted with `make fmt` or validated with `make check-fmt` - -## Writing Tests - -Tests should follow the [table driven test pattern](https://dave.cheney.net/2013/06/09/writing-table-driven-tests-in-go). See other tests in the code base for additional examples. diff --git a/vendor/github.com/go-co-op/gocron/LICENSE b/vendor/github.com/go-co-op/gocron/LICENSE deleted file mode 100644 index 3357d57d..00000000 --- a/vendor/github.com/go-co-op/gocron/LICENSE +++ /dev/null @@ -1,21 +0,0 @@ -MIT License - -Copyright (c) 2014, 辣椒面 - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. diff --git a/vendor/github.com/go-co-op/gocron/Makefile b/vendor/github.com/go-co-op/gocron/Makefile deleted file mode 100644 index 1e16aef6..00000000 --- a/vendor/github.com/go-co-op/gocron/Makefile +++ /dev/null @@ -1,13 +0,0 @@ -.PHONY: fmt check-fmt lint vet test - -GO_PKGS := $(shell go list -f {{.Dir}} ./...) - -fmt: - @go list -f {{.Dir}} ./... | xargs -I{} gofmt -w -s {} - -lint: - @grep "^func " example_test.go | sort -c - @golangci-lint run - -test: - @go test -race -v $(GO_FLAGS) -count=1 $(GO_PKGS) diff --git a/vendor/github.com/go-co-op/gocron/README.md b/vendor/github.com/go-co-op/gocron/README.md deleted file mode 100644 index 4505b9e0..00000000 --- a/vendor/github.com/go-co-op/gocron/README.md +++ /dev/null @@ -1,203 +0,0 @@ -# gocron: A Golang Job Scheduling Package. - -[![CI State](https://github.com/go-co-op/gocron/actions/workflows/go_test.yml/badge.svg?branch=main&event=push)](https://github.com/go-co-op/gocron/actions) -![Go Report Card](https://goreportcard.com/badge/github.com/go-co-op/gocron) [![Go Doc](https://godoc.org/github.com/go-co-op/gocron?status.svg)](https://pkg.go.dev/github.com/go-co-op/gocron) - -gocron is a job scheduling package which lets you run Go functions at pre-determined intervals -using a simple, human-friendly syntax. - -gocron is a Golang scheduler implementation similar to the Ruby module -[clockwork](https://github.com/tomykaira/clockwork) and the Python job scheduling package [schedule](https://github.com/dbader/schedule). - -See also these two great articles that were used for design input: - -- [Rethinking Cron](http://adam.herokuapp.com/past/2010/4/13/rethinking_cron/) -- [Replace Cron with Clockwork](http://adam.herokuapp.com/past/2010/6/30/replace_cron_with_clockwork/) - -If you want to chat, you can find us at Slack! -[](https://gophers.slack.com/archives/CQ7T0T1FW) - -## Concepts - -- **Scheduler**: The scheduler tracks all the jobs assigned to it and makes sure they are passed to the executor when - ready to be run. The scheduler is able to manage overall aspects of job behavior like limiting how many jobs - are running at one time. -- **Job**: The job is simply aware of the task (go function) it's provided and is therefore only able to perform - actions related to that task like preventing itself from overruning a previous task that is taking a long time. -- **Executor**: The executor, as it's name suggests, is simply responsible for calling the task (go function) that - the job hands to it when sent by the scheduler. - -## Examples - -```golang -s := gocron.NewScheduler(time.UTC) - -job, err := s.Every(5).Seconds().Do(func(){ ... }) -if err != nil { - // handle the error related to setting up the job -} - -// strings parse to duration -s.Every("5m").Do(func(){ ... }) - -s.Every(5).Days().Do(func(){ ... }) - -s.Every(1).Month(1, 2, 3).Do(func(){ ... }) - -// set time -s.Every(1).Day().At("10:30").Do(func(){ ... }) - -// set multiple times -s.Every(1).Day().At("10:30;08:00").Do(func(){ ... }) - -s.Every(1).Day().At("10:30").At("08:00").Do(func(){ ... }) - -// Schedule each last day of the month -s.Every(1).MonthLastDay().Do(func(){ ... }) - -// Or each last day of every other month -s.Every(2).MonthLastDay().Do(func(){ ... }) - -// cron expressions supported -s.Cron("*/1 * * * *").Do(task) // every minute - -// cron second-level expressions supported -s.CronWithSeconds("*/1 * * * * *").Do(task) // every second - -// you can start running the scheduler in two different ways: -// starts the scheduler asynchronously -s.StartAsync() -// starts the scheduler and blocks current execution path -s.StartBlocking() -``` - -For more examples, take a look in our [go docs](https://pkg.go.dev/github.com/go-co-op/gocron#pkg-examples) - -## Options - -| Interval | Supported schedule options | -| ------------ | ------------------------------------------------------------------- | -| sub-second | `StartAt()` | -| milliseconds | `StartAt()` | -| seconds | `StartAt()` | -| minutes | `StartAt()` | -| hours | `StartAt()` | -| days | `StartAt()`, `At()` | -| weeks | `StartAt()`, `At()`, `Weekday()` (and all week day named functions) | -| months | `StartAt()`, `At()` | - -There are several options available to restrict how jobs run: - -| Mode | Function | Behavior | -|---------------------|---------------------------|------------------------------------------------------------------------------------------------------| -| Default | | jobs are rescheduled at every interval | -| Job singleton | `SingletonMode()` | a long running job will not be rescheduled until the current run is completed | -| Scheduler limit | `SetMaxConcurrentJobs()` | set a collective maximum number of concurrent jobs running across the scheduler | -| Distributed locking | `WithDistributedLocker()` | prevents the same job from being run more than once when running multiple instances of the scheduler | - -## Distributed Locker Implementations - -- Redis: [redislock](https://github.com/go-co-op/gocron-redis-lock) `go get github.com/go-co-op/gocron-redis-lock` - -## Tags - -Jobs may have arbitrary tags added which can be useful when tracking many jobs. -The scheduler supports both enforcing tags to be unique and when not unique, -running all jobs with a given tag. - -```golang -s := gocron.NewScheduler(time.UTC) -s.TagsUnique() - -_, _ = s.Every(1).Week().Tag("foo").Do(task) -_, err := s.Every(1).Week().Tag("foo").Do(task) -// error!!! - -s := gocron.NewScheduler(time.UTC) - -s.Every(2).Day().Tag("tag").At("10:00").Do(task) -s.Every(1).Minute().Tag("tag").Do(task) -s.RunByTag("tag") -// both jobs will run -``` - -## FAQ - -- Q: I'm running multiple pods on a distributed environment. How can I make a job not run once per pod causing duplication? - - We recommend using your own lock solution within the jobs themselves (you could use [Redis](https://redis.io/topics/distlock), for example) - - A2: Use the scheduler option `WithDistributedLocker` and either use an implemented [backend](#distributed-locker-implementations) - or implement your own and contribute it back in a PR! - -- Q: I've removed my job from the scheduler, but how can I stop a long-running job that has already been triggered? - - A: We recommend using a means of canceling your job, e.g. a `context.WithCancel()`. - - A2: You can listen to the job context Done channel to know when the job has been canceled - ```golang - task := func(in string, job gocron.Job) { - fmt.Printf("this job's last run: %s this job's next run: %s\n", job.LastRun(), job.NextRun()) - fmt.Printf("in argument is %s\n", in) - - ticker := time.NewTicker(100 * time.Millisecond) - defer ticker.Stop() - - for { - select { - case <-job.Context().Done(): - fmt.Printf("function has been canceled, performing cleanup and exiting gracefully\n") - return - case <-ticker.C: - fmt.Printf("performing a hard job that takes a long time that I want to kill whenever I want\n") - } - } - } - - var err error - s := gocron.NewScheduler(time.UTC) - s.SingletonModeAll() - j, err := s.Every(1).Hour().Tag("myJob").DoWithJobDetails(task, "foo") - if err != nil { - log.Fatalln("error scheduling job", err) - } - - s.StartAsync() - - // Simulate some more work - time.Sleep(time.Second) - - // I want to stop the job, together with the underlying goroutine - fmt.Printf("now I want to kill the job\n") - err = s.RemoveByTag("myJob") - if err != nil { - log.Fatalln("error removing job by tag", err) - } - - // Wait a bit so that we can see that the job is exiting gracefully - time.Sleep(time.Second) - fmt.Printf("Job: %#v, Error: %#v", j, err) - ``` - ---- - -Looking to contribute? Try to follow these guidelines: - -- Use issues for everything -- For a small change, just send a PR! -- For bigger changes, please open an issue for discussion before sending a PR. -- PRs should have: tests, documentation and examples (if it makes sense) -- You can also contribute by: - - Reporting issues - - Suggesting new features or enhancements - - Improving/fixing documentation - ---- - -## Design - -![design-diagram](https://user-images.githubusercontent.com/19351306/110375142-2ba88680-8017-11eb-80c3-554cc746b165.png) - -[Jetbrains](https://www.jetbrains.com/?from=gocron) supports this project with GoLand licenses. We appreciate their support for free and open source software! - -## Star History - -[![Star History Chart](https://api.star-history.com/svg?repos=go-co-op/gocron&type=Date)](https://star-history.com/#go-co-op/gocron&Date) - - diff --git a/vendor/github.com/go-co-op/gocron/SECURITY.md b/vendor/github.com/go-co-op/gocron/SECURITY.md deleted file mode 100644 index 6b986412..00000000 --- a/vendor/github.com/go-co-op/gocron/SECURITY.md +++ /dev/null @@ -1,15 +0,0 @@ -# Security Policy - -## Supported Versions - -The current plan is to maintain version 1 as long as possible incorporating any necessary security patches. - -| Version | Supported | -| ------- | ------------------ | -| 1.x.x | :white_check_mark: | - -## Reporting a Vulnerability - -Vulnerabilities can be reported by [opening an issue](https://github.com/go-co-op/gocron/issues/new/choose) or reaching out on Slack: [](https://gophers.slack.com/archives/CQ7T0T1FW) - -We will do our best to addrerss any vulnerabilities in an expeditious manner. diff --git a/vendor/github.com/go-co-op/gocron/executor.go b/vendor/github.com/go-co-op/gocron/executor.go deleted file mode 100644 index 33dc9d69..00000000 --- a/vendor/github.com/go-co-op/gocron/executor.go +++ /dev/null @@ -1,252 +0,0 @@ -package gocron - -import ( - "context" - "sync" - "time" - - "go.uber.org/atomic" -) - -const ( - // RescheduleMode - the default is that if a limit on maximum - // concurrent jobs is set and the limit is reached, a job will - // skip it's run and try again on the next occurrence in the schedule - RescheduleMode limitMode = iota - - // WaitMode - if a limit on maximum concurrent jobs is set - // and the limit is reached, a job will wait to try and run - // until a spot in the limit is freed up. - // - // Note: this mode can produce unpredictable results as - // job execution order isn't guaranteed. For example, a job that - // executes frequently may pile up in the wait queue and be executed - // many times back to back when the queue opens. - // - // Warning: do not use this mode if your jobs will continue to stack - // up beyond the ability of the limit workers to keep up. An example of - // what NOT to do: - // - // s.Every("1s").Do(func() { - // // this will result in an ever-growing number of goroutines - // // blocked trying to send to the buffered channel - // time.Sleep(10 * time.Minute) - // }) - - WaitMode -) - -type executor struct { - jobFunctions chan jobFunction // the chan upon which the jobFunctions are passed in from the scheduler - ctx context.Context // used to tell the executor to stop - cancel context.CancelFunc // used to tell the executor to stop - wg *sync.WaitGroup // used by the scheduler to wait for the executor to stop - jobsWg *sync.WaitGroup // used by the executor to wait for all jobs to finish - singletonWgs *sync.Map // used by the executor to wait for the singleton runners to complete - - limitMode limitMode // when SetMaxConcurrentJobs() is set upon the scheduler - limitModeMaxRunningJobs int // stores the maximum number of concurrently running jobs - limitModeFuncsRunning *atomic.Int64 // tracks the count of limited mode funcs running - limitModeFuncWg *sync.WaitGroup // allow the executor to wait for limit mode functions to wrap up - limitModeQueue chan jobFunction // pass job functions to the limit mode workers - limitModeQueueMu *sync.Mutex // for protecting the limitModeQueue - limitModeRunningJobs *atomic.Int64 // tracks the count of running jobs to check against the max - stopped *atomic.Bool // allow workers to drain the buffered limitModeQueue - - distributedLocker Locker // support running jobs across multiple instances -} - -func newExecutor() executor { - e := executor{ - jobFunctions: make(chan jobFunction, 1), - singletonWgs: &sync.Map{}, - limitModeFuncsRunning: atomic.NewInt64(0), - limitModeFuncWg: &sync.WaitGroup{}, - limitModeRunningJobs: atomic.NewInt64(0), - limitModeQueueMu: &sync.Mutex{}, - } - return e -} - -func runJob(f jobFunction) { - f.runStartCount.Add(1) - f.isRunning.Store(true) - callJobFunc(f.eventListeners.onBeforeJobExecution) - callJobFuncWithParams(f.function, f.parameters) - callJobFunc(f.eventListeners.onAfterJobExecution) - f.isRunning.Store(false) - f.runFinishCount.Add(1) -} - -func (jf *jobFunction) singletonRunner() { - jf.singletonRunnerOn.Store(true) - jf.singletonWg.Add(1) - for { - select { - case <-jf.ctx.Done(): - jf.singletonWg.Done() - jf.singletonRunnerOn.Store(false) - jf.singletonQueue = make(chan struct{}, 1000) - jf.stopped.Store(false) - return - case <-jf.singletonQueue: - if !jf.stopped.Load() { - runJob(*jf) - } - } - } -} - -func (e *executor) limitModeRunner() { - for { - select { - case <-e.ctx.Done(): - e.limitModeFuncsRunning.Inc() - e.limitModeFuncWg.Done() - return - case jf := <-e.limitModeQueue: - if !e.stopped.Load() { - e.runJob(jf) - } - } - } -} - -func (e *executor) start() { - e.wg = &sync.WaitGroup{} - e.wg.Add(1) - - stopCtx, cancel := context.WithCancel(context.Background()) - e.ctx = stopCtx - e.cancel = cancel - - e.jobsWg = &sync.WaitGroup{} - - e.stopped = atomic.NewBool(false) - - e.limitModeQueueMu.Lock() - e.limitModeQueue = make(chan jobFunction, 1000) - e.limitModeQueueMu.Unlock() - go e.run() -} - -func (e *executor) runJob(f jobFunction) { - switch f.runConfig.mode { - case defaultMode: - lockKey := f.jobName - if lockKey == "" { - lockKey = f.funcName - } - if e.distributedLocker != nil { - l, err := e.distributedLocker.Lock(f.ctx, lockKey) - if err != nil || l == nil { - return - } - defer func() { - durationToNextRun := time.Until(f.jobFuncNextRun) - if durationToNextRun > time.Second*5 { - durationToNextRun = time.Second * 5 - } - if durationToNextRun > time.Millisecond*100 { - timeToSleep := time.Duration(float64(durationToNextRun) * 0.9) - select { - case <-e.ctx.Done(): - case <-time.After(timeToSleep): - } - } - _ = l.Unlock(f.ctx) - }() - } - runJob(f) - case singletonMode: - e.singletonWgs.Store(f.singletonWg, struct{}{}) - - if !f.singletonRunnerOn.Load() { - go f.singletonRunner() - } - f.singletonQueue <- struct{}{} - } -} - -func (e *executor) run() { - for { - select { - case f := <-e.jobFunctions: - if e.stopped.Load() { - continue - } - - if e.limitModeMaxRunningJobs > 0 { - countRunning := e.limitModeFuncsRunning.Load() - if countRunning < int64(e.limitModeMaxRunningJobs) { - diff := int64(e.limitModeMaxRunningJobs) - countRunning - for i := int64(0); i < diff; i++ { - e.limitModeFuncWg.Add(1) - go e.limitModeRunner() - e.limitModeFuncsRunning.Inc() - } - } - } - - e.jobsWg.Add(1) - go func() { - defer e.jobsWg.Done() - - panicHandlerMutex.RLock() - defer panicHandlerMutex.RUnlock() - - if panicHandler != nil { - defer func() { - if r := recover(); r != nil { - panicHandler(f.funcName, r) - } - }() - } - - if e.limitModeMaxRunningJobs > 0 { - switch e.limitMode { - case RescheduleMode: - if e.limitModeRunningJobs.Load() < int64(e.limitModeMaxRunningJobs) { - select { - case e.limitModeQueue <- f: - case <-e.ctx.Done(): - } - } - case WaitMode: - select { - case e.limitModeQueue <- f: - case <-e.ctx.Done(): - } - } - return - } - - e.runJob(f) - }() - case <-e.ctx.Done(): - e.jobsWg.Wait() - e.wg.Done() - return - } - } -} - -func (e *executor) stop() { - e.stopped.Store(true) - e.cancel() - e.wg.Wait() - if e.singletonWgs != nil { - e.singletonWgs.Range(func(key, value interface{}) bool { - if wg, ok := key.(*sync.WaitGroup); ok { - wg.Wait() - } - return true - }) - } - if e.limitModeMaxRunningJobs > 0 { - e.limitModeFuncWg.Wait() - e.limitModeQueueMu.Lock() - e.limitModeQueue = nil - e.limitModeQueueMu.Unlock() - } -} diff --git a/vendor/github.com/go-co-op/gocron/gocron.go b/vendor/github.com/go-co-op/gocron/gocron.go deleted file mode 100644 index a05cae4d..00000000 --- a/vendor/github.com/go-co-op/gocron/gocron.go +++ /dev/null @@ -1,129 +0,0 @@ -// Package gocron : A Golang Job Scheduling Package. -// -// An in-process scheduler for periodic jobs that uses the builder pattern -// for configuration. gocron lets you run Golang functions periodically -// at pre-determined intervals using a simple, human-friendly syntax. -package gocron - -import ( - "errors" - "fmt" - "reflect" - "regexp" - "runtime" - "sync" - "time" -) - -// PanicHandlerFunc represents a type that can be set to handle panics occurring -// during job execution. -type PanicHandlerFunc func(jobName string, recoverData interface{}) - -// The global panic handler -var ( - panicHandler PanicHandlerFunc - panicHandlerMutex = sync.RWMutex{} -) - -// SetPanicHandler sets the global panicHandler to the given function. -// Leaving it nil or setting it to nil disables automatic panic handling. -// If the panicHandler is not nil, any panic that occurs during executing a job will be recovered -// and the panicHandlerFunc will be called with the job's funcName and the recover data. -func SetPanicHandler(handler PanicHandlerFunc) { - panicHandlerMutex.Lock() - defer panicHandlerMutex.Unlock() - panicHandler = handler -} - -// Error declarations for gocron related errors -var ( - ErrNotAFunction = errors.New("gocron: only functions can be scheduled into the job queue") - ErrNotScheduledWeekday = errors.New("gocron: job not scheduled weekly on a weekday") - ErrJobNotFoundWithTag = errors.New("gocron: no jobs found with given tag") - ErrUnsupportedTimeFormat = errors.New("gocron: the given time format is not supported") - ErrInvalidInterval = errors.New("gocron: .Every() interval must be greater than 0") - ErrInvalidIntervalType = errors.New("gocron: .Every() interval must be int, time.Duration, or string") - ErrInvalidIntervalUnitsSelection = errors.New("gocron: .Every(time.Duration) and .Cron() cannot be used with units (e.g. .Seconds())") - ErrInvalidFunctionParameters = errors.New("gocron: length of function parameters must match job function parameters") - - ErrAtTimeNotSupported = errors.New("gocron: the At() method is not supported for this time unit") - ErrWeekdayNotSupported = errors.New("gocron: weekday is not supported for time unit") - ErrInvalidDayOfMonthEntry = errors.New("gocron: only days 1 through 28 are allowed for monthly schedules") - ErrTagsUnique = func(tag string) error { return fmt.Errorf("gocron: a non-unique tag was set on the job: %s", tag) } - ErrWrongParams = errors.New("gocron: wrong list of params") - ErrDoWithJobDetails = errors.New("gocron: DoWithJobDetails expects a function whose last parameter is a gocron.Job") - ErrUpdateCalledWithoutJob = errors.New("gocron: a call to Scheduler.Update() requires a call to Scheduler.Job() first") - ErrCronParseFailure = errors.New("gocron: cron expression failed to be parsed") - ErrInvalidDaysOfMonthDuplicateValue = errors.New("gocron: duplicate days of month is not allowed in Month() and Months() methods") -) - -func wrapOrError(toWrap error, err error) error { - var returnErr error - if toWrap != nil && !errors.Is(err, toWrap) { - returnErr = fmt.Errorf("%s: %w", err, toWrap) - } else { - returnErr = err - } - return returnErr -} - -// regex patterns for supported time formats -var ( - timeWithSeconds = regexp.MustCompile(`(?m)^\d{1,2}:\d\d:\d\d$`) - timeWithoutSeconds = regexp.MustCompile(`(?m)^\d{1,2}:\d\d$`) -) - -type schedulingUnit int - -const ( - // default unit is seconds - milliseconds schedulingUnit = iota - seconds - minutes - hours - days - weeks - months - duration - crontab -) - -func callJobFunc(jobFunc interface{}) { - if jobFunc != nil { - reflect.ValueOf(jobFunc).Call([]reflect.Value{}) - } -} - -func callJobFuncWithParams(jobFunc interface{}, params []interface{}) { - f := reflect.ValueOf(jobFunc) - if len(params) != f.Type().NumIn() { - return - } - in := make([]reflect.Value, len(params)) - for k, param := range params { - in[k] = reflect.ValueOf(param) - } - f.Call(in) -} - -func getFunctionName(fn interface{}) string { - return runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name() -} - -func parseTime(t string) (hour, min, sec int, err error) { - var timeLayout string - switch { - case timeWithSeconds.Match([]byte(t)): - timeLayout = "15:04:05" - case timeWithoutSeconds.Match([]byte(t)): - timeLayout = "15:04" - default: - return 0, 0, 0, ErrUnsupportedTimeFormat - } - - parsedTime, err := time.Parse(timeLayout, t) - if err != nil { - return 0, 0, 0, ErrUnsupportedTimeFormat - } - return parsedTime.Hour(), parsedTime.Minute(), parsedTime.Second(), nil -} diff --git a/vendor/github.com/go-co-op/gocron/job.go b/vendor/github.com/go-co-op/gocron/job.go deleted file mode 100644 index 87918fa6..00000000 --- a/vendor/github.com/go-co-op/gocron/job.go +++ /dev/null @@ -1,508 +0,0 @@ -package gocron - -import ( - "context" - "fmt" - "math/rand" - "sort" - "sync" - "time" - - "github.com/robfig/cron/v3" - "go.uber.org/atomic" -) - -// Job struct stores the information necessary to run a Job -type Job struct { - mu *jobMutex - jobFunction - interval int // interval * unit between runs - random // details for randomness - duration time.Duration // time duration between runs - unit schedulingUnit // time units, e.g. 'minutes', 'hours'... - startsImmediately bool // if the Job should run upon scheduler start - atTimes []time.Duration // optional time(s) at which this Job runs when interval is day - startAtTime time.Time // optional time at which the Job starts - error error // error related to Job - lastRun time.Time // datetime of last run - nextRun time.Time // datetime of next run - scheduledWeekdays []time.Weekday // Specific days of the week to start on - daysOfTheMonth []int // Specific days of the month to run the job - tags []string // allow the user to tag Jobs with certain labels - timer *time.Timer // handles running tasks at specific time - cronSchedule cron.Schedule // stores the schedule when a task uses cron - runWithDetails bool // when true the job is passed as the last arg of the jobFunc -} - -type random struct { - rand *rand.Rand - randomizeInterval bool // whether the interval is random - randomIntervalRange [2]int // random interval range -} - -type jobFunction struct { - eventListeners // additional functions to allow run 'em during job performing - function interface{} // task's function - parameters []interface{} // task's function parameters - parametersLen int // length of the passed parameters - jobName string // key of the distributed lock - funcName string // the name of the function - e.g. main.func1 - runConfig runConfig // configuration for how many times to run the job - singletonQueue chan struct{} // queues jobs for the singleton runner to handle - singletonRunnerOn *atomic.Bool // whether the runner function for singleton is running - ctx context.Context // for cancellation - cancel context.CancelFunc // for cancellation - isRunning *atomic.Bool // whether the job func is currently being run - runStartCount *atomic.Int64 // number of times the job was started - runFinishCount *atomic.Int64 // number of times the job was finished - singletonWg *sync.WaitGroup // used by singleton runner - stopped *atomic.Bool // tracks whether the job is currently stopped - jobFuncNextRun time.Time // the next time the job is scheduled to run -} - -type eventListeners struct { - onBeforeJobExecution interface{} // performs before job executing - onAfterJobExecution interface{} // performs after job executing -} - -type jobMutex struct { - sync.RWMutex -} - -func (jf *jobFunction) copy() jobFunction { - cp := jobFunction{ - eventListeners: jf.eventListeners, - function: jf.function, - parameters: nil, - parametersLen: jf.parametersLen, - funcName: jf.funcName, - jobName: jf.jobName, - runConfig: jf.runConfig, - singletonQueue: jf.singletonQueue, - ctx: jf.ctx, - cancel: jf.cancel, - isRunning: jf.isRunning, - runStartCount: jf.runStartCount, - runFinishCount: jf.runFinishCount, - singletonWg: jf.singletonWg, - singletonRunnerOn: jf.singletonRunnerOn, - stopped: jf.stopped, - jobFuncNextRun: jf.jobFuncNextRun, - } - cp.parameters = append(cp.parameters, jf.parameters...) - return cp -} - -type runConfig struct { - finiteRuns bool - maxRuns int - mode mode -} - -// mode is the Job's running mode -type mode int8 - -const ( - // defaultMode disable any mode - defaultMode mode = iota - - // singletonMode switch to single job mode - singletonMode -) - -// newJob creates a new Job with the provided interval -func newJob(interval int, startImmediately bool, singletonMode bool) *Job { - ctx, cancel := context.WithCancel(context.Background()) - job := &Job{ - mu: &jobMutex{}, - interval: interval, - unit: seconds, - lastRun: time.Time{}, - nextRun: time.Time{}, - jobFunction: jobFunction{ - ctx: ctx, - cancel: cancel, - isRunning: atomic.NewBool(false), - runStartCount: atomic.NewInt64(0), - runFinishCount: atomic.NewInt64(0), - singletonRunnerOn: atomic.NewBool(false), - stopped: atomic.NewBool(false), - }, - tags: []string{}, - startsImmediately: startImmediately, - } - if singletonMode { - job.SingletonMode() - } - return job -} - -// Name sets the name of the current job. -// -// If the scheduler is running using WithDistributedLocker(), -// the job name is used as the distributed lock key. -func (j *Job) Name(name string) { - j.mu.Lock() - defer j.mu.Unlock() - j.jobName = name -} - -func (j *Job) setRandomInterval(a, b int) { - j.random.rand = rand.New(rand.NewSource(time.Now().UnixNano())) // nolint - - j.random.randomizeInterval = true - if a < b { - j.random.randomIntervalRange[0] = a - j.random.randomIntervalRange[1] = b + 1 - } else { - j.random.randomIntervalRange[0] = b - j.random.randomIntervalRange[1] = a + 1 - } -} - -func (j *Job) getRandomInterval() int { - randNum := j.rand.Intn(j.randomIntervalRange[1] - j.randomIntervalRange[0]) - return j.randomIntervalRange[0] + randNum -} - -func (j *Job) getInterval() int { - if j.randomizeInterval { - return j.getRandomInterval() - } - return j.interval -} - -func (j *Job) neverRan() bool { - jobLastRun := j.LastRun() - return jobLastRun.IsZero() -} - -func (j *Job) getStartsImmediately() bool { - return j.startsImmediately -} - -func (j *Job) setStartsImmediately(b bool) { - j.startsImmediately = b -} - -func (j *Job) setTimer(t *time.Timer) { - j.mu.Lock() - defer j.mu.Unlock() - j.timer = t -} - -func (j *Job) getFirstAtTime() time.Duration { - var t time.Duration - if len(j.atTimes) > 0 { - t = j.atTimes[0] - } - - return t -} - -func (j *Job) getAtTime(lastRun time.Time) time.Duration { - var r time.Duration - if len(j.atTimes) == 0 { - return r - } - - if len(j.atTimes) == 1 { - return j.atTimes[0] - } - - if lastRun.IsZero() { - r = j.atTimes[0] - } else { - for _, d := range j.atTimes { - nt := time.Date(lastRun.Year(), lastRun.Month(), lastRun.Day(), 0, 0, 0, 0, lastRun.Location()).Add(d) - if nt.After(lastRun) { - r = d - break - } - } - } - - return r -} - -func (j *Job) addAtTime(t time.Duration) { - if len(j.atTimes) == 0 { - j.atTimes = append(j.atTimes, t) - return - } - exist := false - index := sort.Search(len(j.atTimes), func(i int) bool { - atTime := j.atTimes[i] - b := atTime >= t - if b { - exist = atTime == t - } - return b - }) - - // ignore if present - if exist { - return - } - - j.atTimes = append(j.atTimes, time.Duration(0)) - copy(j.atTimes[index+1:], j.atTimes[index:]) - j.atTimes[index] = t -} - -func (j *Job) getStartAtTime() time.Time { - return j.startAtTime -} - -func (j *Job) setStartAtTime(t time.Time) { - j.startAtTime = t -} - -func (j *Job) getUnit() schedulingUnit { - j.mu.RLock() - defer j.mu.RUnlock() - return j.unit -} - -func (j *Job) setUnit(t schedulingUnit) { - j.mu.Lock() - defer j.mu.Unlock() - j.unit = t -} - -func (j *Job) getDuration() time.Duration { - j.mu.RLock() - defer j.mu.RUnlock() - return j.duration -} - -func (j *Job) setDuration(t time.Duration) { - j.mu.Lock() - defer j.mu.Unlock() - j.duration = t -} - -// hasTags returns true if all tags are matched on this Job -func (j *Job) hasTags(tags ...string) bool { - // Build map of all Job tags for easy comparison - jobTags := map[string]int{} - for _, tag := range j.tags { - jobTags[tag] = 0 - } - - // Loop through required tags and if one doesn't exist, return false - for _, tag := range tags { - _, ok := jobTags[tag] - if !ok { - return false - } - } - return true -} - -// Error returns an error if one occurred while creating the Job. -// If multiple errors occurred, they will be wrapped and can be -// checked using the standard unwrap options. -func (j *Job) Error() error { - return j.error -} - -// Context returns the job's context. The context controls cancellation. -func (j *Job) Context() context.Context { - return j.ctx -} - -// Tag allows you to add arbitrary labels to a Job that do not -// impact the functionality of the Job -func (j *Job) Tag(tags ...string) { - j.tags = append(j.tags, tags...) -} - -// Untag removes a tag from a Job -func (j *Job) Untag(t string) { - var newTags []string - for _, tag := range j.tags { - if t != tag { - newTags = append(newTags, tag) - } - } - - j.tags = newTags -} - -// Tags returns the tags attached to the Job -func (j *Job) Tags() []string { - return j.tags -} - -// SetEventListeners accepts two functions that will be called, one before and one after the job is run -func (j *Job) SetEventListeners(onBeforeJobExecution interface{}, onAfterJobExecution interface{}) { - j.eventListeners = eventListeners{ - onBeforeJobExecution: onBeforeJobExecution, - onAfterJobExecution: onAfterJobExecution, - } -} - -// ScheduledTime returns the time of the Job's next scheduled run -func (j *Job) ScheduledTime() time.Time { - j.mu.RLock() - defer j.mu.RUnlock() - return j.nextRun -} - -// ScheduledAtTime returns the specific time of day the Job will run at. -// If multiple times are set, the earliest time will be returned. -func (j *Job) ScheduledAtTime() string { - if len(j.atTimes) == 0 { - return "00:00" - } - - return fmt.Sprintf("%02d:%02d", j.getFirstAtTime()/time.Hour, (j.getFirstAtTime()%time.Hour)/time.Minute) -} - -// ScheduledAtTimes returns the specific times of day the Job will run at -func (j *Job) ScheduledAtTimes() []string { - r := make([]string, len(j.atTimes)) - for i, t := range j.atTimes { - r[i] = fmt.Sprintf("%02d:%02d", t/time.Hour, (t%time.Hour)/time.Minute) - } - - return r -} - -// Weekday returns which day of the week the Job will run on and -// will return an error if the Job is not scheduled weekly -func (j *Job) Weekday() (time.Weekday, error) { - if len(j.scheduledWeekdays) == 0 { - return time.Sunday, ErrNotScheduledWeekday - } - return j.scheduledWeekdays[0], nil -} - -// Weekdays returns a slice of time.Weekday that the Job will run in a week and -// will return an error if the Job is not scheduled weekly -func (j *Job) Weekdays() []time.Weekday { - // appending on j.scheduledWeekdays may cause a side effect - if len(j.scheduledWeekdays) == 0 { - return []time.Weekday{time.Sunday} - } - - return j.scheduledWeekdays -} - -// LimitRunsTo limits the number of executions of this job to n. -// Upon reaching the limit, the job is removed from the scheduler. -// -// Note: If a job is added to a running scheduler and this method is then used -// you may see the job run more than the set limit as job is scheduled immediately -// by default upon being added to the scheduler. It is recommended to use the -// LimitRunsTo() func on the scheduler chain when scheduling the job. -// For example: scheduler.LimitRunsTo(1).Do() -func (j *Job) LimitRunsTo(n int) { - j.mu.Lock() - defer j.mu.Unlock() - j.runConfig.finiteRuns = true - j.runConfig.maxRuns = n -} - -// SingletonMode prevents a new job from starting if the prior job has not yet -// completed it's run -// Note: If a job is added to a running scheduler and this method is then used -// you may see the job run overrun itself as job is scheduled immediately -// by default upon being added to the scheduler. It is recommended to use the -// SingletonMode() func on the scheduler chain when scheduling the job. -func (j *Job) SingletonMode() { - j.mu.Lock() - defer j.mu.Unlock() - j.runConfig.mode = singletonMode - j.jobFunction.singletonWg = &sync.WaitGroup{} - j.jobFunction.singletonQueue = make(chan struct{}, 100) -} - -// shouldRun evaluates if this job should run again -// based on the runConfig -func (j *Job) shouldRun() bool { - j.mu.RLock() - defer j.mu.RUnlock() - return !j.runConfig.finiteRuns || j.runStartCount.Load() < int64(j.runConfig.maxRuns) -} - -// LastRun returns the time the job was run last -func (j *Job) LastRun() time.Time { - j.mu.RLock() - defer j.mu.RUnlock() - return j.lastRun -} - -func (j *Job) setLastRun(t time.Time) { - j.lastRun = t -} - -// NextRun returns the time the job will run next -func (j *Job) NextRun() time.Time { - j.mu.RLock() - defer j.mu.RUnlock() - return j.nextRun -} - -func (j *Job) setNextRun(t time.Time) { - j.mu.Lock() - defer j.mu.Unlock() - j.nextRun = t - j.jobFunction.jobFuncNextRun = t -} - -// RunCount returns the number of times the job has been started -func (j *Job) RunCount() int { - j.mu.Lock() - defer j.mu.Unlock() - return int(j.runStartCount.Load()) -} - -// FinishedRunCount returns the number of times the job has finished running -func (j *Job) FinishedRunCount() int { - j.mu.Lock() - defer j.mu.Unlock() - return int(j.runFinishCount.Load()) -} - -func (j *Job) stop() { - j.mu.Lock() - defer j.mu.Unlock() - if j.timer != nil { - j.timer.Stop() - } - if j.cancel != nil { - j.cancel() - j.ctx, j.cancel = context.WithCancel(context.Background()) - } - j.stopped.Store(true) -} - -// IsRunning reports whether any instances of the job function are currently running -func (j *Job) IsRunning() bool { - return j.isRunning.Load() -} - -// you must Lock the job before calling copy -func (j *Job) copy() Job { - return Job{ - mu: &jobMutex{}, - jobFunction: j.jobFunction, - interval: j.interval, - duration: j.duration, - unit: j.unit, - startsImmediately: j.startsImmediately, - atTimes: j.atTimes, - startAtTime: j.startAtTime, - error: j.error, - lastRun: j.lastRun, - nextRun: j.nextRun, - scheduledWeekdays: j.scheduledWeekdays, - daysOfTheMonth: j.daysOfTheMonth, - tags: j.tags, - timer: j.timer, - cronSchedule: j.cronSchedule, - runWithDetails: j.runWithDetails, - } -} diff --git a/vendor/github.com/go-co-op/gocron/locker.go b/vendor/github.com/go-co-op/gocron/locker.go deleted file mode 100644 index dc713f9b..00000000 --- a/vendor/github.com/go-co-op/gocron/locker.go +++ /dev/null @@ -1,23 +0,0 @@ -package gocron - -import ( - "context" - "errors" -) - -var ( - ErrFailedToConnectToRedis = errors.New("gocron: failed to connect to redis") - ErrFailedToObtainLock = errors.New("gocron: failed to obtain lock") - ErrFailedToReleaseLock = errors.New("gocron: failed to release lock") -) - -// Locker represents the required interface to lock jobs when running multiple schedulers. -type Locker interface { - // Lock if an error is returned by lock, the job will not be scheduled. - Lock(ctx context.Context, key string) (Lock, error) -} - -// Lock represents an obtained lock -type Lock interface { - Unlock(ctx context.Context) error -} diff --git a/vendor/github.com/go-co-op/gocron/scheduler.go b/vendor/github.com/go-co-op/gocron/scheduler.go deleted file mode 100644 index 304c758a..00000000 --- a/vendor/github.com/go-co-op/gocron/scheduler.go +++ /dev/null @@ -1,1426 +0,0 @@ -package gocron - -import ( - "context" - "fmt" - "reflect" - "sort" - "strings" - "sync" - "time" - - "github.com/robfig/cron/v3" - "go.uber.org/atomic" -) - -type limitMode int8 - -// Scheduler struct stores a list of Jobs and the location of time used by the Scheduler, -// and implements the sort. any for sorting Jobs, by the time of jobFuncNextRun -type Scheduler struct { - jobsMutex sync.RWMutex - jobs []*Job - - locationMutex sync.RWMutex - location *time.Location - running *atomic.Bool // represents if the scheduler is running at the moment or not - - time TimeWrapper // wrapper around time.Time - timer func(d time.Duration, f func()) *time.Timer - executor *executor // executes jobs passed via chan - - tags sync.Map // for storing tags when unique tags is set - - tagsUnique bool // defines whether tags should be unique - updateJob bool // so the scheduler knows to create a new job or update the current - waitForInterval bool // defaults jobs to waiting for first interval to start - singletonMode bool // defaults all jobs to use SingletonMode() - - startBlockingStopChanMutex sync.Mutex - startBlockingStopChan chan struct{} // stops the scheduler - - // tracks whether we're in a chain of scheduling methods for a job - // a chain is started with any of the scheduler methods that operate - // upon a job and are ended with one of [ Do(), Update() ] - note that - // Update() calls Do(), so really they all end with Do(). - // This allows the caller to begin with any job related scheduler method - // and only with one of [ Every(), EveryRandom(), Cron(), CronWithSeconds(), MonthFirstWeekday() ] - inScheduleChain bool -} - -// days in a week -const allWeekDays = 7 - -// NewScheduler creates a new Scheduler -func NewScheduler(loc *time.Location) *Scheduler { - executor := newExecutor() - - return &Scheduler{ - jobs: make([]*Job, 0), - location: loc, - running: atomic.NewBool(false), - time: &trueTime{}, - executor: &executor, - tagsUnique: false, - timer: afterFunc, - } -} - -// SetMaxConcurrentJobs limits how many jobs can be running at the same time. -// This is useful when running resource intensive jobs and a precise start time is not critical. -// -// Note: WaitMode and RescheduleMode provide details on usage and potential risks. -func (s *Scheduler) SetMaxConcurrentJobs(n int, mode limitMode) { - s.executor.limitModeMaxRunningJobs = n - s.executor.limitMode = mode -} - -// StartBlocking starts all jobs and blocks the current thread. -// This blocking method can be stopped with Stop() from a separate goroutine. -func (s *Scheduler) StartBlocking() { - s.StartAsync() - s.startBlockingStopChanMutex.Lock() - s.startBlockingStopChan = make(chan struct{}, 1) - s.startBlockingStopChanMutex.Unlock() - - <-s.startBlockingStopChan - - s.startBlockingStopChanMutex.Lock() - s.startBlockingStopChan = nil - s.startBlockingStopChanMutex.Unlock() -} - -// StartAsync starts all jobs without blocking the current thread -func (s *Scheduler) StartAsync() { - if !s.IsRunning() { - s.start() - } -} - -// start starts the scheduler, scheduling and running jobs -func (s *Scheduler) start() { - s.executor.start() - s.setRunning(true) - s.runJobs(s.Jobs()) -} - -func (s *Scheduler) runJobs(jobs []*Job) { - for _, job := range jobs { - ctx, cancel := context.WithCancel(context.Background()) - job.mu.Lock() - job.ctx = ctx - job.cancel = cancel - job.mu.Unlock() - s.runContinuous(job) - } -} - -func (s *Scheduler) setRunning(b bool) { - s.running.Store(b) -} - -// IsRunning returns true if the scheduler is running -func (s *Scheduler) IsRunning() bool { - return s.running.Load() -} - -// Jobs returns the list of Jobs from the Scheduler -func (s *Scheduler) Jobs() []*Job { - s.jobsMutex.RLock() - defer s.jobsMutex.RUnlock() - return s.jobs -} - -// Name sets the name of the current job. -// -// If the scheduler is running using WithDistributedLocker(), the job name is used -// as the distributed lock key. -func (s *Scheduler) Name(name string) *Scheduler { - job := s.getCurrentJob() - job.jobName = name - return s -} - -func (s *Scheduler) setJobs(jobs []*Job) { - s.jobsMutex.Lock() - defer s.jobsMutex.Unlock() - s.jobs = jobs -} - -// Len returns the number of Jobs in the Scheduler - implemented for sort -func (s *Scheduler) Len() int { - s.jobsMutex.RLock() - defer s.jobsMutex.RUnlock() - return len(s.jobs) -} - -// Swap places each job into the other job's position given -// the provided job indexes. -func (s *Scheduler) Swap(i, j int) { - s.jobsMutex.Lock() - defer s.jobsMutex.Unlock() - s.jobs[i], s.jobs[j] = s.jobs[j], s.jobs[i] -} - -// Less compares the next run of jobs based on their index. -// Returns true if the second job is after the first. -func (s *Scheduler) Less(first, second int) bool { - return s.Jobs()[second].NextRun().Unix() >= s.Jobs()[first].NextRun().Unix() -} - -// ChangeLocation changes the default time location -func (s *Scheduler) ChangeLocation(newLocation *time.Location) { - s.locationMutex.Lock() - defer s.locationMutex.Unlock() - s.location = newLocation -} - -// Location provides the current location set on the scheduler -func (s *Scheduler) Location() *time.Location { - s.locationMutex.RLock() - defer s.locationMutex.RUnlock() - return s.location -} - -type nextRun struct { - duration time.Duration - dateTime time.Time -} - -// scheduleNextRun Compute the instant when this Job should run next -func (s *Scheduler) scheduleNextRun(job *Job) (bool, nextRun) { - now := s.now() - if !s.jobPresent(job) { - return false, nextRun{} - } - - lastRun := now - - if job.neverRan() { - // Increment startAtTime to the future - if !job.startAtTime.IsZero() && job.startAtTime.Before(now) { - duration := s.durationToNextRun(job.startAtTime, job).duration - job.startAtTime = job.startAtTime.Add(duration) - if job.startAtTime.Before(now) { - diff := now.Sub(job.startAtTime) - duration := s.durationToNextRun(job.startAtTime, job).duration - count := diff / duration - if diff%duration != 0 { - count++ - } - job.startAtTime = job.startAtTime.Add(duration * count) - } - } - } else { - lastRun = job.NextRun() - } - - if !job.shouldRun() { - s.RemoveByReference(job) - return false, nextRun{} - } - - next := s.durationToNextRun(lastRun, job) - - jobNextRun := job.NextRun() - if jobNextRun.After(now) { - job.setLastRun(now) - } else { - job.setLastRun(jobNextRun) - } - - if next.dateTime.IsZero() { - next.dateTime = lastRun.Add(next.duration) - job.setNextRun(next.dateTime) - } else { - job.setNextRun(next.dateTime) - } - return true, next -} - -// durationToNextRun calculate how much time to the next run, depending on unit -func (s *Scheduler) durationToNextRun(lastRun time.Time, job *Job) nextRun { - // job can be scheduled with .StartAt() - if job.getFirstAtTime() == 0 && job.getStartAtTime().After(lastRun) { - sa := job.getStartAtTime() - job.addAtTime( - time.Duration(sa.Hour())*time.Hour + - time.Duration(sa.Minute())*time.Minute + - time.Duration(sa.Second())*time.Second, - ) - return nextRun{duration: job.getStartAtTime().Sub(s.now()), dateTime: job.getStartAtTime()} - } - - var next nextRun - switch job.getUnit() { - case milliseconds, seconds, minutes, hours: - next.duration = s.calculateDuration(job) - case days: - next = s.calculateDays(job, lastRun) - case weeks: - if len(job.scheduledWeekdays) != 0 { // weekday selected, Every().Monday(), for example - next = s.calculateWeekday(job, lastRun) - } else { - next = s.calculateWeeks(job, lastRun) - } - if next.dateTime.Before(job.getStartAtTime()) { - return s.durationToNextRun(job.getStartAtTime(), job) - } - case months: - next = s.calculateMonths(job, lastRun) - case duration: - next.duration = job.getDuration() - case crontab: - next.dateTime = job.cronSchedule.Next(lastRun) - next.duration = next.dateTime.Sub(lastRun) - } - return next -} - -func (s *Scheduler) calculateMonths(job *Job, lastRun time.Time) nextRun { - // Special case: the last day of the month - if len(job.daysOfTheMonth) == 1 && job.daysOfTheMonth[0] == -1 { - return calculateNextRunForLastDayOfMonth(s, job, lastRun) - } - - if len(job.daysOfTheMonth) != 0 { // calculate days to job.daysOfTheMonth - - nextRunDateMap := make(map[int]nextRun) - for _, day := range job.daysOfTheMonth { - nextRunDateMap[day] = calculateNextRunForMonth(s, job, lastRun, day) - } - - nextRunResult := nextRun{} - for _, val := range nextRunDateMap { - if nextRunResult.dateTime.IsZero() { - nextRunResult = val - } else if nextRunResult.dateTime.Sub(val.dateTime).Milliseconds() > 0 { - nextRunResult = val - } - } - - return nextRunResult - } - next := s.roundToMidnightAndAddDSTAware(lastRun, job.getFirstAtTime()).AddDate(0, job.getInterval(), 0) - return nextRun{duration: until(lastRun, next), dateTime: next} -} - -func calculateNextRunForLastDayOfMonth(s *Scheduler, job *Job, lastRun time.Time) nextRun { - // Calculate the last day of the next month, by adding job.interval+1 months (i.e. the - // first day of the month after the next month), and subtracting one day, unless the - // last run occurred before the end of the month. - addMonth := job.getInterval() - atTime := job.getAtTime(lastRun) - if testDate := lastRun.AddDate(0, 0, 1); testDate.Month() != lastRun.Month() && - !s.roundToMidnightAndAddDSTAware(lastRun, atTime).After(lastRun) { - // Our last run was on the last day of this month. - addMonth++ - atTime = job.getFirstAtTime() - } - - next := time.Date(lastRun.Year(), lastRun.Month(), 1, 0, 0, 0, 0, s.Location()). - Add(atTime). - AddDate(0, addMonth, 0). - AddDate(0, 0, -1) - return nextRun{duration: until(lastRun, next), dateTime: next} -} - -func calculateNextRunForMonth(s *Scheduler, job *Job, lastRun time.Time, dayOfMonth int) nextRun { - atTime := job.getAtTime(lastRun) - natTime := atTime - - hours, minutes, seconds := s.deconstructDuration(atTime) - jobDay := time.Date(lastRun.Year(), lastRun.Month(), dayOfMonth, hours, minutes, seconds, 0, s.Location()) - - difference := absDuration(lastRun.Sub(jobDay)) - next := lastRun - if jobDay.Before(lastRun) { // shouldn't run this month; schedule for next interval minus day difference - next = next.AddDate(0, job.getInterval(), -0) - next = next.Add(-difference) - natTime = job.getFirstAtTime() - } else { - if job.getInterval() == 1 && !jobDay.Equal(lastRun) { // every month counts current month - next = next.AddDate(0, job.getInterval()-1, 0) - } else { // should run next month interval - next = next.AddDate(0, job.getInterval(), 0) - natTime = job.getFirstAtTime() - } - next = next.Add(difference) - } - if atTime != natTime { - next = next.Add(-atTime).Add(natTime) - } - return nextRun{duration: until(lastRun, next), dateTime: next} -} - -func (s *Scheduler) calculateWeekday(job *Job, lastRun time.Time) nextRun { - daysToWeekday := s.remainingDaysToWeekday(lastRun, job) - totalDaysDifference := s.calculateTotalDaysDifference(lastRun, daysToWeekday, job) - acTime := job.getAtTime(lastRun) - if totalDaysDifference > 0 { - acTime = job.getFirstAtTime() - } - next := s.roundToMidnightAndAddDSTAware(lastRun, acTime).AddDate(0, 0, totalDaysDifference) - return nextRun{duration: until(lastRun, next), dateTime: next} -} - -func (s *Scheduler) calculateWeeks(job *Job, lastRun time.Time) nextRun { - totalDaysDifference := int(job.getInterval()) * 7 - - var next time.Time - - atTimes := job.atTimes - for _, at := range atTimes { - n := s.roundToMidnightAndAddDSTAware(lastRun, at) - if n.After(s.now()) { - next = n - break - } - } - - if next.IsZero() { - next = s.roundToMidnightAndAddDSTAware(lastRun, job.getFirstAtTime()).AddDate(0, 0, totalDaysDifference) - } - - return nextRun{duration: until(lastRun, next), dateTime: next} -} - -func (s *Scheduler) calculateTotalDaysDifference(lastRun time.Time, daysToWeekday int, job *Job) int { - if job.getInterval() > 1 { - // just count weeks after the first jobs were done - if job.RunCount() < len(job.Weekdays()) { - return daysToWeekday - } - if daysToWeekday > 0 { - return int(job.getInterval())*7 - (allWeekDays - daysToWeekday) - } - return int(job.getInterval()) * 7 - } - - if daysToWeekday == 0 { // today, at future time or already passed - lastRunAtTime := time.Date(lastRun.Year(), lastRun.Month(), lastRun.Day(), 0, 0, 0, 0, s.Location()).Add(job.getAtTime(lastRun)) - if lastRun.Before(lastRunAtTime) { - return 0 - } - return 7 - } - return daysToWeekday -} - -func (s *Scheduler) calculateDays(job *Job, lastRun time.Time) nextRun { - if job.getInterval() == 1 { - lastRunDayPlusJobAtTime := s.roundToMidnightAndAddDSTAware(lastRun, job.getAtTime(lastRun)) - - if shouldRunToday(lastRun, lastRunDayPlusJobAtTime) { - return nextRun{duration: until(lastRun, lastRunDayPlusJobAtTime), dateTime: lastRunDayPlusJobAtTime} - } - } - - nextRunAtTime := s.roundToMidnightAndAddDSTAware(lastRun, job.getFirstAtTime()).AddDate(0, 0, job.getInterval()).In(s.Location()) - return nextRun{duration: until(lastRun, nextRunAtTime), dateTime: nextRunAtTime} -} - -func until(from time.Time, until time.Time) time.Duration { - return until.Sub(from) -} - -func shouldRunToday(lastRun time.Time, atTime time.Time) bool { - return lastRun.Before(atTime) -} - -func in(scheduleWeekdays []time.Weekday, weekday time.Weekday) bool { - in := false - - for _, weekdayInSchedule := range scheduleWeekdays { - if int(weekdayInSchedule) == int(weekday) { - in = true - break - } - } - return in -} - -func (s *Scheduler) calculateDuration(job *Job) time.Duration { - interval := job.getInterval() - switch job.getUnit() { - case milliseconds: - return time.Duration(interval) * time.Millisecond - case seconds: - return time.Duration(interval) * time.Second - case minutes: - return time.Duration(interval) * time.Minute - default: - return time.Duration(interval) * time.Hour - } -} - -func (s *Scheduler) remainingDaysToWeekday(lastRun time.Time, job *Job) int { - weekDays := job.Weekdays() - sort.Slice(weekDays, func(i, j int) bool { - return weekDays[i] < weekDays[j] - }) - - equals := false - lastRunWeekday := lastRun.Weekday() - index := sort.Search(len(weekDays), func(i int) bool { - b := weekDays[i] >= lastRunWeekday - if b { - equals = weekDays[i] == lastRunWeekday - } - return b - }) - // check atTime - if equals { - if s.roundToMidnightAndAddDSTAware(lastRun, job.getAtTime(lastRun)).After(lastRun) { - return 0 - } - index++ - } - - if index < len(weekDays) { - return int(weekDays[index] - lastRunWeekday) - } - - return int(weekDays[0]) + allWeekDays - int(lastRunWeekday) -} - -// absDuration returns the abs time difference -func absDuration(a time.Duration) time.Duration { - if a >= 0 { - return a - } - return -a -} - -func (s *Scheduler) deconstructDuration(d time.Duration) (hours int, minutes int, seconds int) { - hours = int(d.Seconds()) / int(time.Hour/time.Second) - minutes = (int(d.Seconds()) % int(time.Hour/time.Second)) / int(time.Minute/time.Second) - seconds = int(d.Seconds()) % int(time.Minute/time.Second) - return -} - -// roundToMidnightAndAddDSTAware truncates time to midnight and "adds" duration in a DST aware manner -func (s *Scheduler) roundToMidnightAndAddDSTAware(t time.Time, d time.Duration) time.Time { - hours, minutes, seconds := s.deconstructDuration(d) - return time.Date(t.Year(), t.Month(), t.Day(), hours, minutes, seconds, 0, s.Location()) -} - -// NextRun datetime when the next Job should run. -func (s *Scheduler) NextRun() (*Job, time.Time) { - if len(s.Jobs()) <= 0 { - return nil, s.now() - } - - sort.Sort(s) - - return s.Jobs()[0], s.Jobs()[0].NextRun() -} - -// EveryRandom schedules a new period Job that runs at random intervals -// between the provided lower (inclusive) and upper (inclusive) bounds. -// The default unit is Seconds(). Call a different unit in the chain -// if you would like to change that. For example, Minutes(), Hours(), etc. -func (s *Scheduler) EveryRandom(lower, upper int) *Scheduler { - job := s.getCurrentJob() - - job.setRandomInterval(lower, upper) - return s -} - -// Every schedules a new periodic Job with an interval. -// Interval can be an int, time.Duration or a string that -// parses with time.ParseDuration(). -// Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". -func (s *Scheduler) Every(interval interface{}) *Scheduler { - job := s.getCurrentJob() - - switch interval := interval.(type) { - case int: - job.interval = interval - if interval <= 0 { - job.error = wrapOrError(job.error, ErrInvalidInterval) - } - case time.Duration: - job.interval = 0 - job.setDuration(interval) - job.setUnit(duration) - case string: - d, err := time.ParseDuration(interval) - if err != nil { - job.error = wrapOrError(job.error, err) - } - job.setDuration(d) - job.setUnit(duration) - default: - job.error = wrapOrError(job.error, ErrInvalidIntervalType) - } - - return s -} - -func (s *Scheduler) run(job *Job) { - if !s.IsRunning() { - return - } - - job.mu.Lock() - - if job.function == nil { - job.mu.Unlock() - s.Remove(job) - return - } - - defer job.mu.Unlock() - - if job.runWithDetails { - switch len(job.parameters) { - case job.parametersLen: - job.parameters = append(job.parameters, job.copy()) - case job.parametersLen + 1: - job.parameters[job.parametersLen] = job.copy() - default: - // something is really wrong and we should never get here - job.error = wrapOrError(job.error, ErrInvalidFunctionParameters) - return - } - } - - s.executor.jobFunctions <- job.jobFunction.copy() -} - -func (s *Scheduler) runContinuous(job *Job) { - shouldRun, next := s.scheduleNextRun(job) - if !shouldRun { - return - } - - if !job.getStartsImmediately() { - job.setStartsImmediately(true) - } else { - s.run(job) - } - nr := next.dateTime.Sub(s.now()) - if nr < 0 { - job.setLastRun(s.now()) - shouldRun, next := s.scheduleNextRun(job) - if !shouldRun { - return - } - nr = next.dateTime.Sub(s.now()) - } - - job.setTimer(s.timer(nr, func() { - if !next.dateTime.IsZero() { - for { - n := s.now().UnixNano() - next.dateTime.UnixNano() - if n >= 0 { - break - } - select { - case <-s.executor.ctx.Done(): - case <-time.After(time.Duration(n)): - } - } - } - s.runContinuous(job) - })) -} - -// RunAll run all Jobs regardless if they are scheduled to run or not -func (s *Scheduler) RunAll() { - s.RunAllWithDelay(0) -} - -// RunAllWithDelay runs all jobs with the provided delay in between each job -func (s *Scheduler) RunAllWithDelay(d time.Duration) { - for _, job := range s.Jobs() { - s.run(job) - s.time.Sleep(d) - } -} - -// RunByTag runs all the jobs containing a specific tag -// regardless of whether they are scheduled to run or not -func (s *Scheduler) RunByTag(tag string) error { - return s.RunByTagWithDelay(tag, 0) -} - -// RunByTagWithDelay is same as RunByTag but introduces a delay between -// each job execution -func (s *Scheduler) RunByTagWithDelay(tag string, d time.Duration) error { - jobs, err := s.FindJobsByTag(tag) - if err != nil { - return err - } - for _, job := range jobs { - s.run(job) - s.time.Sleep(d) - } - return nil -} - -// Remove specific Job by function -// -// Removing a job stops that job's timer. However, if a job has already -// been started by the job's timer before being removed, the only way to stop -// it through gocron is to use DoWithJobDetails and access the job's Context which -// informs you when the job has been canceled. -// -// Alternatively, the job function would need to have implemented a means of -// stopping, e.g. using a context.WithCancel() passed as params to Do method. -// -// The above are based on what the underlying library suggests https://pkg.go.dev/time#Timer.Stop. -func (s *Scheduler) Remove(job interface{}) { - fName := getFunctionName(job) - j := s.findJobByTaskName(fName) - s.removeJobsUniqueTags(j) - s.removeByCondition(func(someJob *Job) bool { - return someJob.funcName == fName - }) -} - -// RemoveByReference removes specific Job by reference -func (s *Scheduler) RemoveByReference(job *Job) { - s.removeJobsUniqueTags(job) - s.removeByCondition(func(someJob *Job) bool { - job.mu.RLock() - defer job.mu.RUnlock() - return someJob == job - }) -} - -func (s *Scheduler) findJobByTaskName(name string) *Job { - for _, job := range s.Jobs() { - if job.funcName == name { - return job - } - } - return nil -} - -func (s *Scheduler) removeJobsUniqueTags(job *Job) { - if job == nil { - return - } - if s.tagsUnique && len(job.tags) > 0 { - for _, tag := range job.tags { - s.tags.Delete(tag) - } - } -} - -func (s *Scheduler) removeByCondition(shouldRemove func(*Job) bool) { - retainedJobs := make([]*Job, 0) - for _, job := range s.Jobs() { - if !shouldRemove(job) { - retainedJobs = append(retainedJobs, job) - } else { - job.stop() - } - } - s.setJobs(retainedJobs) -} - -// RemoveByTag will remove Jobs that match the given tag. -func (s *Scheduler) RemoveByTag(tag string) error { - return s.RemoveByTags(tag) -} - -// RemoveByTags will remove Jobs that match all given tags. -func (s *Scheduler) RemoveByTags(tags ...string) error { - jobs, err := s.FindJobsByTag(tags...) - if err != nil { - return err - } - - for _, job := range jobs { - s.RemoveByReference(job) - } - return nil -} - -// RemoveByTagsAny will remove Jobs that match any one of the given tags. -func (s *Scheduler) RemoveByTagsAny(tags ...string) error { - var errs error - mJob := make(map[*Job]struct{}) - for _, tag := range tags { - jobs, err := s.FindJobsByTag(tag) - if err != nil { - errs = wrapOrError(errs, fmt.Errorf("%s: %s", err.Error(), tag)) - } - for _, job := range jobs { - mJob[job] = struct{}{} - } - } - - for job := range mJob { - s.RemoveByReference(job) - } - - return errs -} - -// FindJobsByTag will return a slice of Jobs that match all given tags -func (s *Scheduler) FindJobsByTag(tags ...string) ([]*Job, error) { - var jobs []*Job - -Jobs: - for _, job := range s.Jobs() { - if job.hasTags(tags...) { - jobs = append(jobs, job) - continue Jobs - } - } - - if len(jobs) > 0 { - return jobs, nil - } - return nil, ErrJobNotFoundWithTag -} - -// MonthFirstWeekday sets the job to run the first specified weekday of the month -func (s *Scheduler) MonthFirstWeekday(weekday time.Weekday) *Scheduler { - _, month, day := s.time.Now(time.UTC).Date() - - if day < 7 { - return s.Cron(fmt.Sprintf("0 0 %d %d %d", day, month, weekday)) - } - - return s.Cron(fmt.Sprintf("0 0 %d %d %d", day, month+1, weekday)) -} - -// LimitRunsTo limits the number of executions of this job to n. -// Upon reaching the limit, the job is removed from the scheduler. -func (s *Scheduler) LimitRunsTo(i int) *Scheduler { - job := s.getCurrentJob() - job.LimitRunsTo(i) - return s -} - -// SingletonMode prevents a new job from starting if the prior job has not yet -// completed its run -// -// Warning: do not use this mode if your jobs will continue to stack -// up beyond the ability of the limit workers to keep up. An example of -// what NOT to do: -// -// s.Every("1s").SingletonMode().Do(func() { -// // this will result in an ever-growing number of goroutines -// // blocked trying to send to the buffered channel -// time.Sleep(10 * time.Minute) -// }) -func (s *Scheduler) SingletonMode() *Scheduler { - job := s.getCurrentJob() - job.SingletonMode() - return s -} - -// SingletonModeAll prevents new jobs from starting if the prior instance of the -// particular job has not yet completed its run -// -// Warning: do not use this mode if your jobs will continue to stack -// up beyond the ability of the limit workers to keep up. An example of -// what NOT to do: -// -// s := gocron.NewScheduler(time.UTC) -// s.SingletonModeAll() -// -// s.Every("1s").Do(func() { -// // this will result in an ever-growing number of goroutines -// // blocked trying to send to the buffered channel -// time.Sleep(10 * time.Minute) -// }) -func (s *Scheduler) SingletonModeAll() { - s.singletonMode = true -} - -// TaskPresent checks if specific job's function was added to the scheduler. -func (s *Scheduler) TaskPresent(j interface{}) bool { - for _, job := range s.Jobs() { - if job.funcName == getFunctionName(j) { - return true - } - } - return false -} - -// To avoid the recursive read lock on s.Jobs() and this function, -// creating this new function and distributing the lock between jobPresent, _jobPresent -func (s *Scheduler) _jobPresent(j *Job, jobs []*Job) bool { - s.jobsMutex.RLock() - defer s.jobsMutex.RUnlock() - for _, job := range jobs { - if job == j { - return true - } - } - return false -} - -func (s *Scheduler) jobPresent(j *Job) bool { - return s._jobPresent(j, s.Jobs()) -} - -// Clear clears all Jobs from this scheduler -func (s *Scheduler) Clear() { - for _, job := range s.Jobs() { - job.stop() - } - s.setJobs(make([]*Job, 0)) - // If unique tags was enabled, delete all the tags loaded in the tags sync.Map - if s.tagsUnique { - s.tags.Range(func(key interface{}, value interface{}) bool { - s.tags.Delete(key) - return true - }) - } -} - -// Stop stops the scheduler. This is a no-op if the scheduler is already stopped. -// It waits for all running jobs to finish before returning, so it is safe to assume that running jobs will finish when calling this. -func (s *Scheduler) Stop() { - if s.IsRunning() { - s.stop() - } -} - -func (s *Scheduler) stop() { - s.stopJobs(s.jobs) - s.executor.stop() - s.StopBlockingChan() - s.setRunning(false) -} - -func (s *Scheduler) stopJobs(jobs []*Job) { - for _, job := range jobs { - job.stop() - } -} - -func (s *Scheduler) doCommon(jobFun interface{}, params ...interface{}) (*Job, error) { - job := s.getCurrentJob() - s.inScheduleChain = false - - jobUnit := job.getUnit() - jobLastRun := job.LastRun() - if job.getAtTime(jobLastRun) != 0 && (jobUnit <= hours || jobUnit >= duration) { - job.error = wrapOrError(job.error, ErrAtTimeNotSupported) - } - - if len(job.scheduledWeekdays) != 0 && jobUnit != weeks { - job.error = wrapOrError(job.error, ErrWeekdayNotSupported) - } - - if job.unit != crontab && job.getInterval() == 0 { - if job.unit != duration { - job.error = wrapOrError(job.error, ErrInvalidInterval) - } - } - - if job.error != nil { - // delete the job from the scheduler as this job - // cannot be executed - s.RemoveByReference(job) - return nil, job.error - } - - typ := reflect.TypeOf(jobFun) - if typ.Kind() != reflect.Func { - // delete the job for the same reason as above - s.RemoveByReference(job) - return nil, ErrNotAFunction - } - - fname := getFunctionName(jobFun) - if job.funcName != fname { - job.function = jobFun - job.parameters = params - job.funcName = fname - } - - f := reflect.ValueOf(jobFun) - expectedParamLength := f.Type().NumIn() - if job.runWithDetails { - expectedParamLength-- - } - - if len(params) != expectedParamLength { - s.RemoveByReference(job) - job.error = wrapOrError(job.error, ErrWrongParams) - return nil, job.error - } - - if job.runWithDetails && f.Type().In(len(params)).Kind() != reflect.ValueOf(*job).Kind() { - s.RemoveByReference(job) - job.error = wrapOrError(job.error, ErrDoWithJobDetails) - return nil, job.error - } - - // we should not schedule if not running since we can't foresee how long it will take for the scheduler to start - if s.IsRunning() { - s.runContinuous(job) - } - - return job, nil -} - -// Do specifies the jobFunc that should be called every time the Job runs -func (s *Scheduler) Do(jobFun interface{}, params ...interface{}) (*Job, error) { - return s.doCommon(jobFun, params...) -} - -// DoWithJobDetails specifies the jobFunc that should be called every time the Job runs -// and additionally passes the details of the current job to the jobFunc. -// The last argument of the function must be a gocron.Job that will be passed by -// the scheduler when the function is called. -func (s *Scheduler) DoWithJobDetails(jobFun interface{}, params ...interface{}) (*Job, error) { - job := s.getCurrentJob() - job.runWithDetails = true - job.parametersLen = len(params) - return s.doCommon(jobFun, params...) -} - -// At schedules the Job at a specific time of day in the form "HH:MM:SS" or "HH:MM" -// or time.Time (note that only the hours, minutes, seconds and nanos are used). -func (s *Scheduler) At(i interface{}) *Scheduler { - job := s.getCurrentJob() - - switch t := i.(type) { - case string: - for _, tt := range strings.Split(t, ";") { - hour, min, sec, err := parseTime(tt) - if err != nil { - job.error = wrapOrError(job.error, err) - return s - } - // save atTime start as duration from midnight - job.addAtTime(time.Duration(hour)*time.Hour + time.Duration(min)*time.Minute + time.Duration(sec)*time.Second) - } - case time.Time: - job.addAtTime(time.Duration(t.Hour())*time.Hour + time.Duration(t.Minute())*time.Minute + time.Duration(t.Second())*time.Second + time.Duration(t.Nanosecond())*time.Nanosecond) - default: - job.error = wrapOrError(job.error, ErrUnsupportedTimeFormat) - } - job.startsImmediately = false - return s -} - -// Tag will add a tag when creating a job. -func (s *Scheduler) Tag(t ...string) *Scheduler { - job := s.getCurrentJob() - - if s.tagsUnique { - for _, tag := range t { - if _, ok := s.tags.Load(tag); ok { - job.error = wrapOrError(job.error, ErrTagsUnique(tag)) - return s - } - s.tags.Store(tag, struct{}{}) - } - } - - job.tags = append(job.tags, t...) - return s -} - -// GetAllTags returns all tags. -func (s *Scheduler) GetAllTags() []string { - var tags []string - for _, job := range s.Jobs() { - tags = append(tags, job.Tags()...) - } - return tags -} - -// StartAt schedules the next run of the Job. If this time is in the past, the configured interval will be used -// to calculate the next future time -func (s *Scheduler) StartAt(t time.Time) *Scheduler { - job := s.getCurrentJob() - job.setStartAtTime(t) - job.startsImmediately = false - return s -} - -// setUnit sets the unit type -func (s *Scheduler) setUnit(unit schedulingUnit) { - job := s.getCurrentJob() - currentUnit := job.getUnit() - if currentUnit == duration || currentUnit == crontab { - job.error = wrapOrError(job.error, ErrInvalidIntervalUnitsSelection) - return - } - job.setUnit(unit) -} - -// Millisecond sets the unit with seconds -func (s *Scheduler) Millisecond() *Scheduler { - return s.Milliseconds() -} - -// Milliseconds sets the unit with seconds -func (s *Scheduler) Milliseconds() *Scheduler { - s.setUnit(milliseconds) - return s -} - -// Second sets the unit with seconds -func (s *Scheduler) Second() *Scheduler { - return s.Seconds() -} - -// Seconds sets the unit with seconds -func (s *Scheduler) Seconds() *Scheduler { - s.setUnit(seconds) - return s -} - -// Minute sets the unit with minutes -func (s *Scheduler) Minute() *Scheduler { - return s.Minutes() -} - -// Minutes sets the unit with minutes -func (s *Scheduler) Minutes() *Scheduler { - s.setUnit(minutes) - return s -} - -// Hour sets the unit with hours -func (s *Scheduler) Hour() *Scheduler { - return s.Hours() -} - -// Hours sets the unit with hours -func (s *Scheduler) Hours() *Scheduler { - s.setUnit(hours) - return s -} - -// Day sets the unit with days -func (s *Scheduler) Day() *Scheduler { - s.setUnit(days) - return s -} - -// Days set the unit with days -func (s *Scheduler) Days() *Scheduler { - s.setUnit(days) - return s -} - -// Week sets the unit with weeks -func (s *Scheduler) Week() *Scheduler { - s.setUnit(weeks) - return s -} - -// Weeks sets the unit with weeks -func (s *Scheduler) Weeks() *Scheduler { - s.setUnit(weeks) - return s -} - -// Month sets the unit with months -func (s *Scheduler) Month(daysOfMonth ...int) *Scheduler { - return s.Months(daysOfMonth...) -} - -// MonthLastDay sets the unit with months at every last day of the month -func (s *Scheduler) MonthLastDay() *Scheduler { - return s.Months(-1) -} - -// Months sets the unit with months -// Note: Only days 1 through 28 are allowed for monthly schedules -// Note: Multiple add same days of month cannot be allowed -// Note: -1 is a special value and can only occur as single argument -func (s *Scheduler) Months(daysOfTheMonth ...int) *Scheduler { - job := s.getCurrentJob() - - if len(daysOfTheMonth) == 0 { - job.error = wrapOrError(job.error, ErrInvalidDayOfMonthEntry) - } else if len(daysOfTheMonth) == 1 { - dayOfMonth := daysOfTheMonth[0] - if dayOfMonth != -1 && (dayOfMonth < 1 || dayOfMonth > 28) { - job.error = wrapOrError(job.error, ErrInvalidDayOfMonthEntry) - } - } else { - - repeatMap := make(map[int]int) - for _, dayOfMonth := range daysOfTheMonth { - - if dayOfMonth < 1 || dayOfMonth > 28 { - job.error = wrapOrError(job.error, ErrInvalidDayOfMonthEntry) - break - } - - for _, dayOfMonthInJob := range job.daysOfTheMonth { - if dayOfMonthInJob == dayOfMonth { - job.error = wrapOrError(job.error, ErrInvalidDaysOfMonthDuplicateValue) - break - } - } - - if _, ok := repeatMap[dayOfMonth]; ok { - job.error = wrapOrError(job.error, ErrInvalidDaysOfMonthDuplicateValue) - break - } else { - repeatMap[dayOfMonth]++ - } - } - } - if job.daysOfTheMonth == nil { - job.daysOfTheMonth = make([]int, 0) - } - job.daysOfTheMonth = append(job.daysOfTheMonth, daysOfTheMonth...) - job.startsImmediately = false - s.setUnit(months) - return s -} - -// NOTE: If the dayOfTheMonth for the above two functions is -// more than the number of days in that month, the extra day(s) -// spill over to the next month. Similarly, if it's less than 0, -// it will go back to the month before - -// Weekday sets the scheduledWeekdays with a specifics weekdays -func (s *Scheduler) Weekday(weekDay time.Weekday) *Scheduler { - job := s.getCurrentJob() - - if in := in(job.scheduledWeekdays, weekDay); !in { - job.scheduledWeekdays = append(job.scheduledWeekdays, weekDay) - } - - job.startsImmediately = false - s.setUnit(weeks) - return s -} - -func (s *Scheduler) Midday() *Scheduler { - return s.At("12:00") -} - -// Monday sets the start day as Monday -func (s *Scheduler) Monday() *Scheduler { - return s.Weekday(time.Monday) -} - -// Tuesday sets the start day as Tuesday -func (s *Scheduler) Tuesday() *Scheduler { - return s.Weekday(time.Tuesday) -} - -// Wednesday sets the start day as Wednesday -func (s *Scheduler) Wednesday() *Scheduler { - return s.Weekday(time.Wednesday) -} - -// Thursday sets the start day as Thursday -func (s *Scheduler) Thursday() *Scheduler { - return s.Weekday(time.Thursday) -} - -// Friday sets the start day as Friday -func (s *Scheduler) Friday() *Scheduler { - return s.Weekday(time.Friday) -} - -// Saturday sets the start day as Saturday -func (s *Scheduler) Saturday() *Scheduler { - return s.Weekday(time.Saturday) -} - -// Sunday sets the start day as Sunday -func (s *Scheduler) Sunday() *Scheduler { - return s.Weekday(time.Sunday) -} - -func (s *Scheduler) getCurrentJob() *Job { - if !s.inScheduleChain { - s.jobsMutex.Lock() - s.jobs = append(s.jobs, s.newJob(0)) - s.jobsMutex.Unlock() - s.inScheduleChain = true - } - - s.jobsMutex.RLock() - defer s.jobsMutex.RUnlock() - - return s.jobs[len(s.jobs)-1] -} - -func (s *Scheduler) now() time.Time { - return s.time.Now(s.Location()) -} - -// TagsUnique forces job tags to be unique across the scheduler -// when adding tags with (s *Scheduler) Tag(). -// This does not enforce uniqueness on tags added via -// (j *Job) Tag() -func (s *Scheduler) TagsUnique() { - s.tagsUnique = true -} - -// Job puts the provided job in focus for the purpose -// of making changes to the job with the scheduler chain -// and finalized by calling Update() -func (s *Scheduler) Job(j *Job) *Scheduler { - jobs := s.Jobs() - for index, job := range jobs { - if job == j { - // the current job is always last, so put this job there - s.Swap(len(jobs)-1, index) - } - } - s.inScheduleChain = true - s.updateJob = true - return s -} - -// Update stops the job (if running) and starts it with any updates -// that were made to the job in the scheduler chain. Job() must be -// called first to put the given job in focus. -func (s *Scheduler) Update() (*Job, error) { - job := s.getCurrentJob() - - if !s.updateJob { - return job, wrapOrError(job.error, ErrUpdateCalledWithoutJob) - } - s.updateJob = false - job.stop() - job.setStartsImmediately(false) - - if job.runWithDetails { - return s.DoWithJobDetails(job.function, job.parameters...) - } - - if job.runConfig.mode == singletonMode { - job.SingletonMode() - } - - return s.Do(job.function, job.parameters...) -} - -func (s *Scheduler) Cron(cronExpression string) *Scheduler { - return s.cron(cronExpression, false) -} - -func (s *Scheduler) CronWithSeconds(cronExpression string) *Scheduler { - return s.cron(cronExpression, true) -} - -func (s *Scheduler) cron(cronExpression string, withSeconds bool) *Scheduler { - job := s.getCurrentJob() - - var withLocation string - if strings.HasPrefix(cronExpression, "TZ=") || strings.HasPrefix(cronExpression, "CRON_TZ=") { - withLocation = cronExpression - } else { - withLocation = fmt.Sprintf("CRON_TZ=%s %s", s.location.String(), cronExpression) - } - - var ( - cronSchedule cron.Schedule - err error - ) - - if withSeconds { - p := cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor) - cronSchedule, err = p.Parse(withLocation) - } else { - cronSchedule, err = cron.ParseStandard(withLocation) - } - - if err != nil { - job.error = wrapOrError(err, ErrCronParseFailure) - } - - job.cronSchedule = cronSchedule - job.setUnit(crontab) - job.startsImmediately = false - - return s -} - -func (s *Scheduler) newJob(interval int) *Job { - return newJob(interval, !s.waitForInterval, s.singletonMode) -} - -// WaitForScheduleAll defaults the scheduler to create all -// new jobs with the WaitForSchedule option as true. -// The jobs will not start immediately but rather will -// wait until their first scheduled interval. -func (s *Scheduler) WaitForScheduleAll() { - s.waitForInterval = true -} - -// WaitForSchedule sets the job to not start immediately -// but rather wait until the first scheduled interval. -func (s *Scheduler) WaitForSchedule() *Scheduler { - job := s.getCurrentJob() - job.startsImmediately = false - return s -} - -// StartImmediately sets the job to run immediately upon -// starting the scheduler or adding the job to a running -// scheduler. This overrides the jobs start status of any -// previously called methods in the chain. -// -// Note: This is the default behavior of the scheduler -// for most jobs, but is useful for overriding the default -// behavior of Cron scheduled jobs which default to -// WaitForSchedule. -func (s *Scheduler) StartImmediately() *Scheduler { - job := s.getCurrentJob() - job.startsImmediately = true - return s -} - -// CustomTime takes an in a struct that implements the TimeWrapper interface -// allowing the caller to mock the time used by the scheduler. This is useful -// for tests relying on gocron. -func (s *Scheduler) CustomTime(customTimeWrapper TimeWrapper) { - s.time = customTimeWrapper -} - -// CustomTimer takes in a function that mirrors the time.AfterFunc -// This is used to mock the time.AfterFunc function used by the scheduler -// for testing long intervals in a short amount of time. -func (s *Scheduler) CustomTimer(customTimer func(d time.Duration, f func()) *time.Timer) { - s.timer = customTimer -} - -func (s *Scheduler) StopBlockingChan() { - s.startBlockingStopChanMutex.Lock() - if s.IsRunning() && s.startBlockingStopChan != nil { - close(s.startBlockingStopChan) - } - s.startBlockingStopChanMutex.Unlock() -} - -// WithDistributedLocker prevents the same job from being run more than once -// when multiple schedulers are trying to schedule the same job. -// -// NOTE - This is currently in BETA. Please provide any feedback on your usage -// and open bugs with any issues. -// -// One strategy to reduce splay in the job execution times when using -// intervals (e.g. 1s, 1m, 1h), on each scheduler instance, is to use -// StartAt with time.Now().Round(interval) to start the job at the -// next interval boundary. -// -// Another strategy is to use the Cron or CronWithSeconds methods as they -// use the same behavior described above using StartAt. -// -// NOTE - the Locker will NOT lock jobs using the singleton options: -// SingletonMode, or SingletonModeAll -// -// NOTE - beware of potential race conditions when running the Locker -// with SetMaxConcurrentJobs and WaitMode as jobs are not guaranteed -// to be locked when each scheduler's is below its limit and able -// to run the job. -func (s *Scheduler) WithDistributedLocker(l Locker) { - s.executor.distributedLocker = l -} diff --git a/vendor/github.com/go-co-op/gocron/time_helper.go b/vendor/github.com/go-co-op/gocron/time_helper.go deleted file mode 100644 index 487a7a2a..00000000 --- a/vendor/github.com/go-co-op/gocron/time_helper.go +++ /dev/null @@ -1,33 +0,0 @@ -package gocron - -import "time" - -var _ TimeWrapper = (*trueTime)(nil) - -// TimeWrapper is an interface that wraps the Now, Sleep, and Unix methods of the time package. -// This allows the library and users to mock the time package for testing. -type TimeWrapper interface { - Now(*time.Location) time.Time - Unix(int64, int64) time.Time - Sleep(time.Duration) -} - -type trueTime struct{} - -func (t *trueTime) Now(location *time.Location) time.Time { - return time.Now().In(location) -} - -func (t *trueTime) Unix(sec int64, nsec int64) time.Time { - return time.Unix(sec, nsec) -} - -func (t *trueTime) Sleep(d time.Duration) { - time.Sleep(d) -} - -// afterFunc proxies the time.AfterFunc function. -// This allows it to be mocked for testing. -func afterFunc(d time.Duration, f func()) *time.Timer { - return time.AfterFunc(d, f) -} diff --git a/vendor/modules.txt b/vendor/modules.txt index decd10ef..ff8b7e40 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -75,9 +75,6 @@ github.com/gin-gonic/gin/binding github.com/gin-gonic/gin/internal/bytesconv github.com/gin-gonic/gin/internal/json github.com/gin-gonic/gin/render -# github.com/go-co-op/gocron v1.27.0 -## explicit; go 1.16 -github.com/go-co-op/gocron # github.com/go-logr/logr v1.2.4 ## explicit; go 1.16 github.com/go-logr/logr @@ -292,6 +289,8 @@ github.com/redis/go-redis/v9/internal/util # github.com/robfig/cron/v3 v3.0.1 ## explicit; go 1.12 github.com/robfig/cron/v3 +# github.com/rogpeppe/go-internal v1.8.1 +## explicit; go 1.16 # github.com/saracen/go7z v0.0.0-20191010121135-9c09b6bd7fda ## explicit github.com/saracen/go7z