Graceful: Xorm, RepoIndexer, Cron and Others (#9282)

* Change graceful to use a singleton obtained through GetManager instead of a global.
* Graceful: Make TestPullRequests shutdownable
* Graceful: Make the cron tasks graceful
* Graceful: AddTestPullRequest run in graceful ctx
* Graceful: SyncMirrors shutdown
* Graceful: SetDefaultContext for Xorm to be HammerContext
* Avoid starting graceful for migrate commands and checkout
* Graceful: DeliverHooks now can be shutdown
* Fix multiple syncing errors in modules/sync/UniqueQueue &  Make UniqueQueue closable
* Begin the process of making the repo indexer shutdown gracefully
This commit is contained in:
zeripath 2019-12-15 09:51:28 +00:00 committed by GitHub
parent 8bea92c3dc
commit e3c3b33ea7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
37 changed files with 628 additions and 287 deletions

View file

@ -6,6 +6,7 @@ package code
import (
"fmt"
"os"
"strconv"
"strings"
"time"
@ -34,10 +35,11 @@ func InitRepoIndexer() {
return
}
waitChannel := make(chan time.Duration)
// FIXME: graceful: This should use a persistable queue
repoIndexerOperationQueue = make(chan repoIndexerOperation, setting.Indexer.UpdateQueueLength)
go func() {
start := time.Now()
log.Info("Initializing Repository Indexer")
log.Info("PID: %d: Initializing Repository Indexer", os.Getpid())
initRepoIndexer(populateRepoIndexerAsynchronously)
go processRepoIndexerOperationQueue()
waitChannel <- time.Since(start)
@ -45,7 +47,7 @@ func InitRepoIndexer() {
if setting.Indexer.StartupTimeout > 0 {
go func() {
timeout := setting.Indexer.StartupTimeout
if graceful.Manager.IsChild() && setting.GracefulHammerTime > 0 {
if graceful.GetManager().IsChild() && setting.GracefulHammerTime > 0 {
timeout += setting.GracefulHammerTime
}
select {
@ -70,13 +72,6 @@ func populateRepoIndexerAsynchronously() error {
return nil
}
// if there is any existing repo indexer metadata in the DB, delete it
// since we are starting afresh. Also, xorm requires deletes to have a
// condition, and we want to delete everything, thus 1=1.
if err := models.DeleteAllRecords("repo_indexer_status"); err != nil {
return err
}
var maxRepoID int64
if maxRepoID, err = models.GetMaxID("repository"); err != nil {
return err
@ -87,44 +82,59 @@ func populateRepoIndexerAsynchronously() error {
// populateRepoIndexer populate the repo indexer with pre-existing data. This
// should only be run when the indexer is created for the first time.
// FIXME: graceful: This should use a persistable queue
func populateRepoIndexer(maxRepoID int64) {
log.Info("Populating the repo indexer with existing repositories")
isShutdown := graceful.GetManager().IsShutdown()
// start with the maximum existing repo ID and work backwards, so that we
// don't include repos that are created after gitea starts; such repos will
// already be added to the indexer, and we don't need to add them again.
for maxRepoID > 0 {
repos := make([]*models.Repository, 0, models.RepositoryListDefaultPageSize)
err := models.FindByMaxID(maxRepoID, models.RepositoryListDefaultPageSize, &repos)
select {
case <-isShutdown:
log.Info("Repository Indexer population shutdown before completion")
return
default:
}
ids, err := models.GetUnindexedRepos(maxRepoID, 0, 50)
if err != nil {
log.Error("populateRepoIndexer: %v", err)
return
} else if len(repos) == 0 {
} else if len(ids) == 0 {
break
}
for _, repo := range repos {
for _, id := range ids {
select {
case <-isShutdown:
log.Info("Repository Indexer population shutdown before completion")
return
default:
}
repoIndexerOperationQueue <- repoIndexerOperation{
repoID: repo.ID,
repoID: id,
deleted: false,
}
maxRepoID = repo.ID - 1
maxRepoID = id - 1
}
}
log.Info("Done populating the repo indexer with existing repositories")
log.Info("Done (re)populating the repo indexer with existing repositories")
}
func updateRepoIndexer(repoID int64) error {
repo, err := models.GetRepositoryByID(repoID)
if err != nil {
return err
return fmt.Errorf("UpdateRepoIndexer: Unable to GetRepositoryByID: %d, Error: %v", repoID, err)
}
sha, err := getDefaultBranchSha(repo)
if err != nil {
return err
return fmt.Errorf("UpdateRepoIndexer: Unable to GetDefaultBranchSha for: %s/%s, Error: %v", repo.MustOwnerName(), repo.Name, err)
}
changes, err := getRepoChanges(repo, sha)
if err != nil {
return err
return fmt.Errorf("UpdateRepoIndexer: Unable to GetRepoChanges for: %s/%s Sha: %s Error: %v", repo.MustOwnerName(), repo.Name, sha, err)
} else if changes == nil {
return nil
}
@ -132,16 +142,16 @@ func updateRepoIndexer(repoID int64) error {
batch := RepoIndexerBatch()
for _, update := range changes.Updates {
if err := addUpdate(update, repo, batch); err != nil {
return err
return fmt.Errorf("UpdateRepoIndexer: Unable to addUpdate to: %s/%s Sha: %s, update: %s(%s) Error: %v", repo.MustOwnerName(), repo.Name, sha, update.Filename, update.BlobSha, err)
}
}
for _, filename := range changes.RemovedFilenames {
if err := addDelete(filename, repo, batch); err != nil {
return err
return fmt.Errorf("UpdateRepoIndexer: Unable to addDelete to: %s/%s Sha: %s, filename: %s Error: %v", repo.MustOwnerName(), repo.Name, sha, filename, err)
}
}
if err = batch.Flush(); err != nil {
return err
return fmt.Errorf("UpdateRepoIndexer: Unable to flush batch to indexer for repo: %s/%s Error: %v", repo.MustOwnerName(), repo.Name, err)
}
return repo.UpdateIndexerStatus(sha)
}
@ -322,20 +332,26 @@ func nonGenesisChanges(repo *models.Repository, revision string) (*repoChanges,
func processRepoIndexerOperationQueue() {
for {
op := <-repoIndexerOperationQueue
var err error
if op.deleted {
if err = deleteRepoFromIndexer(op.repoID); err != nil {
log.Error("deleteRepoFromIndexer: %v", err)
select {
case op := <-repoIndexerOperationQueue:
var err error
if op.deleted {
if err = deleteRepoFromIndexer(op.repoID); err != nil {
log.Error("DeleteRepoFromIndexer: %v", err)
}
} else {
if err = updateRepoIndexer(op.repoID); err != nil {
log.Error("updateRepoIndexer: %v", err)
}
}
} else {
if err = updateRepoIndexer(op.repoID); err != nil {
log.Error("updateRepoIndexer: %v", err)
for _, watcher := range op.watchers {
watcher <- err
}
case <-graceful.GetManager().IsShutdown():
log.Info("PID: %d Repository indexer queue processing stopped", os.Getpid())
return
}
for _, watcher := range op.watchers {
watcher <- err
}
}
}

View file

@ -5,9 +5,13 @@
package code
import (
"context"
"os"
"strings"
"sync"
"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
@ -104,21 +108,50 @@ func (update RepoIndexerUpdate) AddToFlushingBatch(batch rupture.FlushingBatch)
func initRepoIndexer(populateIndexer func() error) {
indexer, err := openIndexer(setting.Indexer.RepoPath, repoIndexerLatestVersion)
if err != nil {
log.Fatal("InitRepoIndexer: %v", err)
log.Fatal("InitRepoIndexer %s: %v", setting.Indexer.RepoPath, err)
}
if indexer != nil {
indexerHolder.set(indexer)
closeAtTerminate()
// Continue population from where left off
if err = populateIndexer(); err != nil {
log.Fatal("PopulateRepoIndex: %v", err)
}
return
}
if err = createRepoIndexer(setting.Indexer.RepoPath, repoIndexerLatestVersion); err != nil {
log.Fatal("CreateRepoIndexer: %v", err)
}
closeAtTerminate()
// if there is any existing repo indexer metadata in the DB, delete it
// since we are starting afresh. Also, xorm requires deletes to have a
// condition, and we want to delete everything, thus 1=1.
if err := models.DeleteAllRecords("repo_indexer_status"); err != nil {
log.Fatal("DeleteAllRepoIndexerStatus: %v", err)
}
if err = populateIndexer(); err != nil {
log.Fatal("PopulateRepoIndex: %v", err)
}
}
func closeAtTerminate() {
graceful.GetManager().RunAtTerminate(context.Background(), func() {
log.Debug("Closing repo indexer")
indexer := indexerHolder.get()
if indexer != nil {
err := indexer.Close()
if err != nil {
log.Error("Error whilst closing the repository indexer: %v", err)
}
}
log.Info("PID: %d Repository Indexer closed", os.Getpid())
})
}
// createRepoIndexer create a repo indexer if one does not already exist
func createRepoIndexer(path string, latestVersion int) error {
docMapping := bleve.NewDocumentMapping()