Add generic set type (#21408)
This PR adds a generic set type to get rid of maps used as sets. Co-authored-by: wxiaoguang <wxiaoguang@gmail.com>
This commit is contained in:
parent
e84558b093
commit
0e57ff7eee
41 changed files with 328 additions and 324 deletions
|
@ -12,6 +12,7 @@ import (
|
|||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"code.gitea.io/gitea/modules/container"
|
||||
"code.gitea.io/gitea/modules/json"
|
||||
"code.gitea.io/gitea/modules/log"
|
||||
)
|
||||
|
@ -33,7 +34,7 @@ type ChannelUniqueQueueConfiguration ChannelQueueConfiguration
|
|||
type ChannelUniqueQueue struct {
|
||||
*WorkerPool
|
||||
lock sync.Mutex
|
||||
table map[string]bool
|
||||
table container.Set[string]
|
||||
shutdownCtx context.Context
|
||||
shutdownCtxCancel context.CancelFunc
|
||||
terminateCtx context.Context
|
||||
|
@ -58,7 +59,7 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue
|
|||
shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx)
|
||||
|
||||
queue := &ChannelUniqueQueue{
|
||||
table: map[string]bool{},
|
||||
table: make(container.Set[string]),
|
||||
shutdownCtx: shutdownCtx,
|
||||
shutdownCtxCancel: shutdownCtxCancel,
|
||||
terminateCtx: terminateCtx,
|
||||
|
@ -73,7 +74,7 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue
|
|||
bs, _ := json.Marshal(datum)
|
||||
|
||||
queue.lock.Lock()
|
||||
delete(queue.table, string(bs))
|
||||
queue.table.Remove(string(bs))
|
||||
queue.lock.Unlock()
|
||||
|
||||
if u := handle(datum); u != nil {
|
||||
|
@ -127,16 +128,15 @@ func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error {
|
|||
q.lock.Unlock()
|
||||
}
|
||||
}()
|
||||
if _, ok := q.table[string(bs)]; ok {
|
||||
if !q.table.Add(string(bs)) {
|
||||
return ErrAlreadyInQueue
|
||||
}
|
||||
// FIXME: We probably need to implement some sort of limit here
|
||||
// If the downstream queue blocks this table will grow without limit
|
||||
q.table[string(bs)] = true
|
||||
if fn != nil {
|
||||
err := fn()
|
||||
if err != nil {
|
||||
delete(q.table, string(bs))
|
||||
q.table.Remove(string(bs))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -155,8 +155,7 @@ func (q *ChannelUniqueQueue) Has(data Data) (bool, error) {
|
|||
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
_, has := q.table[string(bs)]
|
||||
return has, nil
|
||||
return q.table.Contains(string(bs)), nil
|
||||
}
|
||||
|
||||
// Flush flushes the channel with a timeout - the Flush worker will be registered as a flush worker with the manager
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue