Update Go-Git to take advantage of LargeObjectThreshold (#16316)

Following the merging of https://github.com/go-git/go-git/pull/330 we
can now add a setting to avoid go-git reading and caching large objects.

Signed-off-by: Andrew Thornton <art27@cantab.net>
This commit is contained in:
zeripath 2021-06-30 21:58:45 +01:00 committed by GitHub
parent 4f26e0ac0e
commit 9979983283
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 507 additions and 45 deletions

View file

@ -7,19 +7,21 @@ import (
"github.com/go-git/go-git/v5/plumbing"
"github.com/go-git/go-git/v5/plumbing/cache"
"github.com/go-git/go-git/v5/plumbing/format/idxfile"
"github.com/go-git/go-git/v5/utils/ioutil"
)
// FSObject is an object from the packfile on the filesystem.
type FSObject struct {
hash plumbing.Hash
h *ObjectHeader
offset int64
size int64
typ plumbing.ObjectType
index idxfile.Index
fs billy.Filesystem
path string
cache cache.Object
hash plumbing.Hash
h *ObjectHeader
offset int64
size int64
typ plumbing.ObjectType
index idxfile.Index
fs billy.Filesystem
path string
cache cache.Object
largeObjectThreshold int64
}
// NewFSObject creates a new filesystem object.
@ -32,16 +34,18 @@ func NewFSObject(
fs billy.Filesystem,
path string,
cache cache.Object,
largeObjectThreshold int64,
) *FSObject {
return &FSObject{
hash: hash,
offset: offset,
size: contentSize,
typ: finalType,
index: index,
fs: fs,
path: path,
cache: cache,
hash: hash,
offset: offset,
size: contentSize,
typ: finalType,
index: index,
fs: fs,
path: path,
cache: cache,
largeObjectThreshold: largeObjectThreshold,
}
}
@ -62,7 +66,21 @@ func (o *FSObject) Reader() (io.ReadCloser, error) {
return nil, err
}
p := NewPackfileWithCache(o.index, nil, f, o.cache)
p := NewPackfileWithCache(o.index, nil, f, o.cache, o.largeObjectThreshold)
if o.largeObjectThreshold > 0 && o.size > o.largeObjectThreshold {
// We have a big object
h, err := p.objectHeaderAtOffset(o.offset)
if err != nil {
return nil, err
}
r, err := p.getReaderDirect(h)
if err != nil {
_ = f.Close()
return nil, err
}
return ioutil.NewReadCloserWithCloser(r, f.Close), nil
}
r, err := p.getObjectContent(o.offset)
if err != nil {
_ = f.Close()

View file

@ -2,6 +2,8 @@ package packfile
import (
"bytes"
"compress/zlib"
"fmt"
"io"
"os"
@ -35,11 +37,12 @@ const smallObjectThreshold = 16 * 1024
// Packfile allows retrieving information from inside a packfile.
type Packfile struct {
idxfile.Index
fs billy.Filesystem
file billy.File
s *Scanner
deltaBaseCache cache.Object
offsetToType map[int64]plumbing.ObjectType
fs billy.Filesystem
file billy.File
s *Scanner
deltaBaseCache cache.Object
offsetToType map[int64]plumbing.ObjectType
largeObjectThreshold int64
}
// NewPackfileWithCache creates a new Packfile with the given object cache.
@ -50,6 +53,7 @@ func NewPackfileWithCache(
fs billy.Filesystem,
file billy.File,
cache cache.Object,
largeObjectThreshold int64,
) *Packfile {
s := NewScanner(file)
return &Packfile{
@ -59,6 +63,7 @@ func NewPackfileWithCache(
s,
cache,
make(map[int64]plumbing.ObjectType),
largeObjectThreshold,
}
}
@ -66,8 +71,8 @@ func NewPackfileWithCache(
// and packfile idx.
// If the filesystem is provided, the packfile will return FSObjects, otherwise
// it will return MemoryObjects.
func NewPackfile(index idxfile.Index, fs billy.Filesystem, file billy.File) *Packfile {
return NewPackfileWithCache(index, fs, file, cache.NewObjectLRUDefault())
func NewPackfile(index idxfile.Index, fs billy.Filesystem, file billy.File, largeObjectThreshold int64) *Packfile {
return NewPackfileWithCache(index, fs, file, cache.NewObjectLRUDefault(), largeObjectThreshold)
}
// Get retrieves the encoded object in the packfile with the given hash.
@ -263,6 +268,7 @@ func (p *Packfile) getNextObject(h *ObjectHeader, hash plumbing.Hash) (plumbing.
p.fs,
p.file.Name(),
p.deltaBaseCache,
p.largeObjectThreshold,
), nil
}
@ -282,6 +288,50 @@ func (p *Packfile) getObjectContent(offset int64) (io.ReadCloser, error) {
return obj.Reader()
}
func asyncReader(p *Packfile) (io.ReadCloser, error) {
reader := ioutil.NewReaderUsingReaderAt(p.file, p.s.r.offset)
zr := zlibReaderPool.Get().(io.ReadCloser)
if err := zr.(zlib.Resetter).Reset(reader, nil); err != nil {
return nil, fmt.Errorf("zlib reset error: %s", err)
}
return ioutil.NewReadCloserWithCloser(zr, func() error {
zlibReaderPool.Put(zr)
return nil
}), nil
}
func (p *Packfile) getReaderDirect(h *ObjectHeader) (io.ReadCloser, error) {
switch h.Type {
case plumbing.CommitObject, plumbing.TreeObject, plumbing.BlobObject, plumbing.TagObject:
return asyncReader(p)
case plumbing.REFDeltaObject:
deltaRc, err := asyncReader(p)
if err != nil {
return nil, err
}
r, err := p.readREFDeltaObjectContent(h, deltaRc)
if err != nil {
return nil, err
}
return r, nil
case plumbing.OFSDeltaObject:
deltaRc, err := asyncReader(p)
if err != nil {
return nil, err
}
r, err := p.readOFSDeltaObjectContent(h, deltaRc)
if err != nil {
return nil, err
}
return r, nil
default:
return nil, ErrInvalidObject.AddDetails("type %q", h.Type)
}
}
func (p *Packfile) getNextMemoryObject(h *ObjectHeader) (plumbing.EncodedObject, error) {
var obj = new(plumbing.MemoryObject)
obj.SetSize(h.Length)
@ -334,6 +384,20 @@ func (p *Packfile) fillREFDeltaObjectContent(obj plumbing.EncodedObject, ref plu
return p.fillREFDeltaObjectContentWithBuffer(obj, ref, buf)
}
func (p *Packfile) readREFDeltaObjectContent(h *ObjectHeader, deltaRC io.Reader) (io.ReadCloser, error) {
var err error
base, ok := p.cacheGet(h.Reference)
if !ok {
base, err = p.Get(h.Reference)
if err != nil {
return nil, err
}
}
return ReaderFromDelta(base, deltaRC)
}
func (p *Packfile) fillREFDeltaObjectContentWithBuffer(obj plumbing.EncodedObject, ref plumbing.Hash, buf *bytes.Buffer) error {
var err error
@ -364,6 +428,20 @@ func (p *Packfile) fillOFSDeltaObjectContent(obj plumbing.EncodedObject, offset
return p.fillOFSDeltaObjectContentWithBuffer(obj, offset, buf)
}
func (p *Packfile) readOFSDeltaObjectContent(h *ObjectHeader, deltaRC io.Reader) (io.ReadCloser, error) {
hash, err := p.FindHash(h.OffsetReference)
if err != nil {
return nil, err
}
base, err := p.objectAtOffset(h.OffsetReference, hash)
if err != nil {
return nil, err
}
return ReaderFromDelta(base, deltaRC)
}
func (p *Packfile) fillOFSDeltaObjectContentWithBuffer(obj plumbing.EncodedObject, offset int64, buf *bytes.Buffer) error {
hash, err := p.FindHash(offset)
if err != nil {

View file

@ -1,9 +1,11 @@
package packfile
import (
"bufio"
"bytes"
"errors"
"io"
"math"
"github.com/go-git/go-git/v5/plumbing"
"github.com/go-git/go-git/v5/utils/ioutil"
@ -73,6 +75,131 @@ func PatchDelta(src, delta []byte) ([]byte, error) {
return b.Bytes(), nil
}
func ReaderFromDelta(base plumbing.EncodedObject, deltaRC io.Reader) (io.ReadCloser, error) {
deltaBuf := bufio.NewReaderSize(deltaRC, 1024)
srcSz, err := decodeLEB128ByteReader(deltaBuf)
if err != nil {
if err == io.EOF {
return nil, ErrInvalidDelta
}
return nil, err
}
if srcSz != uint(base.Size()) {
return nil, ErrInvalidDelta
}
targetSz, err := decodeLEB128ByteReader(deltaBuf)
if err != nil {
if err == io.EOF {
return nil, ErrInvalidDelta
}
return nil, err
}
remainingTargetSz := targetSz
dstRd, dstWr := io.Pipe()
go func() {
baseRd, err := base.Reader()
if err != nil {
_ = dstWr.CloseWithError(ErrInvalidDelta)
return
}
defer baseRd.Close()
baseBuf := bufio.NewReader(baseRd)
basePos := uint(0)
for {
cmd, err := deltaBuf.ReadByte()
if err == io.EOF {
_ = dstWr.CloseWithError(ErrInvalidDelta)
return
}
if err != nil {
_ = dstWr.CloseWithError(err)
return
}
if isCopyFromSrc(cmd) {
offset, err := decodeOffsetByteReader(cmd, deltaBuf)
if err != nil {
_ = dstWr.CloseWithError(err)
return
}
sz, err := decodeSizeByteReader(cmd, deltaBuf)
if err != nil {
_ = dstWr.CloseWithError(err)
return
}
if invalidSize(sz, targetSz) ||
invalidOffsetSize(offset, sz, srcSz) {
_ = dstWr.Close()
return
}
discard := offset - basePos
if basePos > offset {
_ = baseRd.Close()
baseRd, err = base.Reader()
if err != nil {
_ = dstWr.CloseWithError(ErrInvalidDelta)
return
}
baseBuf.Reset(baseRd)
discard = offset
}
for discard > math.MaxInt32 {
n, err := baseBuf.Discard(math.MaxInt32)
if err != nil {
_ = dstWr.CloseWithError(err)
return
}
basePos += uint(n)
discard -= uint(n)
}
for discard > 0 {
n, err := baseBuf.Discard(int(discard))
if err != nil {
_ = dstWr.CloseWithError(err)
return
}
basePos += uint(n)
discard -= uint(n)
}
if _, err := io.Copy(dstWr, io.LimitReader(baseBuf, int64(sz))); err != nil {
_ = dstWr.CloseWithError(err)
return
}
remainingTargetSz -= sz
basePos += sz
} else if isCopyFromDelta(cmd) {
sz := uint(cmd) // cmd is the size itself
if invalidSize(sz, targetSz) {
_ = dstWr.CloseWithError(ErrInvalidDelta)
return
}
if _, err := io.Copy(dstWr, io.LimitReader(deltaBuf, int64(sz))); err != nil {
_ = dstWr.CloseWithError(err)
return
}
remainingTargetSz -= sz
} else {
_ = dstWr.CloseWithError(ErrDeltaCmd)
return
}
if remainingTargetSz <= 0 {
_ = dstWr.Close()
return
}
}
}()
return dstRd, nil
}
func patchDelta(dst *bytes.Buffer, src, delta []byte) error {
if len(delta) < deltaSizeMin {
return ErrInvalidDelta
@ -161,6 +288,25 @@ func decodeLEB128(input []byte) (uint, []byte) {
return num, input[sz:]
}
func decodeLEB128ByteReader(input io.ByteReader) (uint, error) {
var num, sz uint
for {
b, err := input.ReadByte()
if err != nil {
return 0, err
}
num |= (uint(b) & payload) << (sz * 7) // concats 7 bits chunks
sz++
if uint(b)&continuation == 0 {
break
}
}
return num, nil
}
const (
payload = 0x7f // 0111 1111
continuation = 0x80 // 1000 0000
@ -174,6 +320,40 @@ func isCopyFromDelta(cmd byte) bool {
return (cmd&0x80) == 0 && cmd != 0
}
func decodeOffsetByteReader(cmd byte, delta io.ByteReader) (uint, error) {
var offset uint
if (cmd & 0x01) != 0 {
next, err := delta.ReadByte()
if err != nil {
return 0, err
}
offset = uint(next)
}
if (cmd & 0x02) != 0 {
next, err := delta.ReadByte()
if err != nil {
return 0, err
}
offset |= uint(next) << 8
}
if (cmd & 0x04) != 0 {
next, err := delta.ReadByte()
if err != nil {
return 0, err
}
offset |= uint(next) << 16
}
if (cmd & 0x08) != 0 {
next, err := delta.ReadByte()
if err != nil {
return 0, err
}
offset |= uint(next) << 24
}
return offset, nil
}
func decodeOffset(cmd byte, delta []byte) (uint, []byte, error) {
var offset uint
if (cmd & 0x01) != 0 {
@ -208,6 +388,36 @@ func decodeOffset(cmd byte, delta []byte) (uint, []byte, error) {
return offset, delta, nil
}
func decodeSizeByteReader(cmd byte, delta io.ByteReader) (uint, error) {
var sz uint
if (cmd & 0x10) != 0 {
next, err := delta.ReadByte()
if err != nil {
return 0, err
}
sz = uint(next)
}
if (cmd & 0x20) != 0 {
next, err := delta.ReadByte()
if err != nil {
return 0, err
}
sz |= uint(next) << 8
}
if (cmd & 0x40) != 0 {
next, err := delta.ReadByte()
if err != nil {
return 0, err
}
sz |= uint(next) << 16
}
if sz == 0 {
sz = 0x10000
}
return sz, nil
}
func decodeSize(cmd byte, delta []byte) (uint, []byte, error) {
var sz uint
if (cmd & 0x10) != 0 {

View file

@ -320,6 +320,21 @@ func (s *Scanner) NextObject(w io.Writer) (written int64, crc32 uint32, err erro
return
}
// ReadObject returns a reader for the object content and an error
func (s *Scanner) ReadObject() (io.ReadCloser, error) {
s.pendingObject = nil
zr := zlibReaderPool.Get().(io.ReadCloser)
if err := zr.(zlib.Resetter).Reset(s.r, nil); err != nil {
return nil, fmt.Errorf("zlib reset error: %s", err)
}
return ioutil.NewReadCloserWithCloser(zr, func() error {
zlibReaderPool.Put(zr)
return nil
}), nil
}
// ReadRegularObject reads and write a non-deltified object
// from it zlib stream in an object entry in the packfile.
func (s *Scanner) copyObject(w io.Writer) (n int64, err error) {

View file

@ -0,0 +1,79 @@
package dotgit
import (
"fmt"
"io"
"os"
"github.com/go-git/go-git/v5/plumbing"
"github.com/go-git/go-git/v5/plumbing/format/objfile"
"github.com/go-git/go-git/v5/utils/ioutil"
)
var _ (plumbing.EncodedObject) = &EncodedObject{}
type EncodedObject struct {
dir *DotGit
h plumbing.Hash
t plumbing.ObjectType
sz int64
}
func (e *EncodedObject) Hash() plumbing.Hash {
return e.h
}
func (e *EncodedObject) Reader() (io.ReadCloser, error) {
f, err := e.dir.Object(e.h)
if err != nil {
if os.IsNotExist(err) {
return nil, plumbing.ErrObjectNotFound
}
return nil, err
}
r, err := objfile.NewReader(f)
if err != nil {
return nil, err
}
t, size, err := r.Header()
if err != nil {
_ = r.Close()
return nil, err
}
if t != e.t {
_ = r.Close()
return nil, objfile.ErrHeader
}
if size != e.sz {
_ = r.Close()
return nil, objfile.ErrHeader
}
return ioutil.NewReadCloserWithCloser(r, f.Close), nil
}
func (e *EncodedObject) SetType(plumbing.ObjectType) {}
func (e *EncodedObject) Type() plumbing.ObjectType {
return e.t
}
func (e *EncodedObject) Size() int64 {
return e.sz
}
func (e *EncodedObject) SetSize(int64) {}
func (e *EncodedObject) Writer() (io.WriteCloser, error) {
return nil, fmt.Errorf("Not supported")
}
func NewEncodedObject(dir *DotGit, h plumbing.Hash, t plumbing.ObjectType, size int64) *EncodedObject {
return &EncodedObject{
dir: dir,
h: h,
t: t,
sz: size,
}
}

View file

@ -204,9 +204,9 @@ func (s *ObjectStorage) packfile(idx idxfile.Index, pack plumbing.Hash) (*packfi
var p *packfile.Packfile
if s.objectCache != nil {
p = packfile.NewPackfileWithCache(idx, s.dir.Fs(), f, s.objectCache)
p = packfile.NewPackfileWithCache(idx, s.dir.Fs(), f, s.objectCache, s.options.LargeObjectThreshold)
} else {
p = packfile.NewPackfile(idx, s.dir.Fs(), f)
p = packfile.NewPackfile(idx, s.dir.Fs(), f, s.options.LargeObjectThreshold)
}
return p, s.storePackfileInCache(pack, p)
@ -389,7 +389,6 @@ func (s *ObjectStorage) getFromUnpacked(h plumbing.Hash) (obj plumbing.EncodedOb
return cacheObj, nil
}
obj = s.NewEncodedObject()
r, err := objfile.NewReader(f)
if err != nil {
return nil, err
@ -402,6 +401,13 @@ func (s *ObjectStorage) getFromUnpacked(h plumbing.Hash) (obj plumbing.EncodedOb
return nil, err
}
if s.options.LargeObjectThreshold > 0 && size > s.options.LargeObjectThreshold {
obj = dotgit.NewEncodedObject(s.dir, h, t, size)
return obj, nil
}
obj = s.NewEncodedObject()
obj.SetType(t)
obj.SetSize(size)
w, err := obj.Writer()
@ -595,6 +601,7 @@ func (s *ObjectStorage) buildPackfileIters(
return newPackfileIter(
s.dir.Fs(), pack, t, seen, s.index[h],
s.objectCache, s.options.KeepDescriptors,
s.options.LargeObjectThreshold,
)
},
}, nil
@ -684,6 +691,7 @@ func NewPackfileIter(
idxFile billy.File,
t plumbing.ObjectType,
keepPack bool,
largeObjectThreshold int64,
) (storer.EncodedObjectIter, error) {
idx := idxfile.NewMemoryIndex()
if err := idxfile.NewDecoder(idxFile).Decode(idx); err != nil {
@ -695,7 +703,7 @@ func NewPackfileIter(
}
seen := make(map[plumbing.Hash]struct{})
return newPackfileIter(fs, f, t, seen, idx, nil, keepPack)
return newPackfileIter(fs, f, t, seen, idx, nil, keepPack, largeObjectThreshold)
}
func newPackfileIter(
@ -706,12 +714,13 @@ func newPackfileIter(
index idxfile.Index,
cache cache.Object,
keepPack bool,
largeObjectThreshold int64,
) (storer.EncodedObjectIter, error) {
var p *packfile.Packfile
if cache != nil {
p = packfile.NewPackfileWithCache(index, fs, f, cache)
p = packfile.NewPackfileWithCache(index, fs, f, cache, largeObjectThreshold)
} else {
p = packfile.NewPackfile(index, fs, f)
p = packfile.NewPackfile(index, fs, f, largeObjectThreshold)
}
iter, err := p.GetByType(t)

View file

@ -34,6 +34,9 @@ type Options struct {
// MaxOpenDescriptors is the max number of file descriptors to keep
// open. If KeepDescriptors is true, all file descriptors will remain open.
MaxOpenDescriptors int
// LargeObjectThreshold maximum object size (in bytes) that will be read in to memory.
// If left unset or set to 0 there is no limit
LargeObjectThreshold int64
}
// NewStorage returns a new Storage backed by a given `fs.Filesystem` and cache.

View file

@ -55,6 +55,28 @@ func NewReadCloser(r io.Reader, c io.Closer) io.ReadCloser {
return &readCloser{Reader: r, closer: c}
}
type readCloserCloser struct {
io.ReadCloser
closer func() error
}
func (r *readCloserCloser) Close() (err error) {
defer func() {
if err == nil {
err = r.closer()
return
}
_ = r.closer()
}()
return r.ReadCloser.Close()
}
// NewReadCloserWithCloser creates an `io.ReadCloser` with the given `io.ReaderCloser` and
// `io.Closer` that ensures that the closer is closed on close
func NewReadCloserWithCloser(r io.ReadCloser, c func() error) io.ReadCloser {
return &readCloserCloser{ReadCloser: r, closer: c}
}
type writeCloser struct {
io.Writer
closer io.Closer
@ -82,6 +104,24 @@ func WriteNopCloser(w io.Writer) io.WriteCloser {
return writeNopCloser{w}
}
type readerAtAsReader struct {
io.ReaderAt
offset int64
}
func (r *readerAtAsReader) Read(bs []byte) (int, error) {
n, err := r.ReaderAt.ReadAt(bs, r.offset)
r.offset += int64(n)
return n, err
}
func NewReaderUsingReaderAt(r io.ReaderAt, offset int64) io.Reader {
return &readerAtAsReader{
ReaderAt: r,
offset: offset,
}
}
// CheckClose calls Close on the given io.Closer. If the given *error points to
// nil, it will be assigned the error returned by Close. Otherwise, any error
// returned by Close will be ignored. CheckClose is usually called with defer.