fix sending too little data in chunk upload body

This commit is contained in:
Jeffrey Morgan 2023-11-18 20:36:34 -05:00
parent ac5076ce1e
commit f6b317e8c9

View File

@ -123,19 +123,6 @@ func (b *blobUpload) Run(ctx context.Context, opts *RegistryOptions) {
defer blobUploadManager.Delete(b.Digest) defer blobUploadManager.Delete(b.Digest)
ctx, b.CancelFunc = context.WithCancel(ctx) ctx, b.CancelFunc = context.WithCancel(ctx)
p, err := GetBlobsPath(b.Digest)
if err != nil {
b.err = err
return
}
f, err := os.Open(p)
if err != nil {
b.err = err
return
}
defer f.Close()
g, inner := errgroup.WithContext(ctx) g, inner := errgroup.WithContext(ctx)
g.SetLimit(numUploadParts) g.SetLimit(numUploadParts)
for i := range b.Parts { for i := range b.Parts {
@ -146,7 +133,6 @@ func (b *blobUpload) Run(ctx context.Context, opts *RegistryOptions) {
g.Go(func() error { g.Go(func() error {
var err error var err error
for try := 0; try < maxRetries; try++ { for try := 0; try < maxRetries; try++ {
part.ReadSeeker = io.NewSectionReader(f, part.Offset, part.Size)
err = b.uploadChunk(inner, http.MethodPatch, requestURL, part, opts) err = b.uploadChunk(inner, http.MethodPatch, requestURL, part, opts)
switch { switch {
case errors.Is(err, context.Canceled): case errors.Is(err, context.Canceled):
@ -209,18 +195,27 @@ func (b *blobUpload) Run(ctx context.Context, opts *RegistryOptions) {
} }
func (b *blobUpload) uploadChunk(ctx context.Context, method string, requestURL *url.URL, part *blobUploadPart, opts *RegistryOptions) error { func (b *blobUpload) uploadChunk(ctx context.Context, method string, requestURL *url.URL, part *blobUploadPart, opts *RegistryOptions) error {
part.Reset()
headers := make(http.Header) headers := make(http.Header)
headers.Set("Content-Type", "application/octet-stream") headers.Set("Content-Type", "application/octet-stream")
headers.Set("Content-Length", fmt.Sprintf("%d", part.Size)) headers.Set("Content-Length", fmt.Sprintf("%d", part.Size))
headers.Set("X-Redirect-Uploads", "1")
if method == http.MethodPatch { if method == http.MethodPatch {
headers.Set("X-Redirect-Uploads", "1")
headers.Set("Content-Range", fmt.Sprintf("%d-%d", part.Offset, part.Offset+part.Size-1)) headers.Set("Content-Range", fmt.Sprintf("%d-%d", part.Offset, part.Offset+part.Size-1))
} }
resp, err := makeRequest(ctx, method, requestURL, headers, io.TeeReader(part.ReadSeeker, io.MultiWriter(part, part.Hash)), opts) p, err := GetBlobsPath(b.Digest)
if err != nil {
return err
}
f, err := os.Open(p)
if err != nil {
return err
}
defer f.Close()
resp, err := makeRequest(ctx, method, requestURL, headers, io.TeeReader(io.NewSectionReader(f, part.Offset, part.Size), part), opts)
if err != nil { if err != nil {
return err return err
} }
@ -335,7 +330,6 @@ type blobUploadPart struct {
written int64 written int64
io.ReadSeeker
*blobUpload *blobUpload
} }
@ -343,14 +337,20 @@ func (p *blobUploadPart) Write(b []byte) (n int, err error) {
n = len(b) n = len(b)
p.written += int64(n) p.written += int64(n)
p.Completed.Add(int64(n)) p.Completed.Add(int64(n))
if p.Hash == nil {
p.Hash = md5.New()
}
p.Hash.Write(b)
return n, nil return n, nil
} }
func (p *blobUploadPart) Reset() { func (p *blobUploadPart) Reset() {
p.Seek(0, io.SeekStart)
p.Completed.Add(-int64(p.written)) p.Completed.Add(-int64(p.written))
p.written = 0 p.written = 0
p.Hash = md5.New() p.Hash.Reset()
} }
func uploadBlob(ctx context.Context, mp ModelPath, layer *Layer, opts *RegistryOptions, fn func(api.ProgressResponse)) error { func uploadBlob(ctx context.Context, mp ModelPath, layer *Layer, opts *RegistryOptions, fn func(api.ProgressResponse)) error {