Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 119 additions & 14 deletions pkg/converter/convert_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/opencontainers/image-spec/identity"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"

"github.com/containerd/nydus-snapshotter/pkg/converter/tool"
Expand Down Expand Up @@ -564,12 +565,33 @@ func Merge(ctx context.Context, layers []Layer, dest io.Writer, opt MergeOption)
}
defer os.RemoveAll(workDir)

getBootstrapPath := func(layerIdx int) string {
getLayerPath := func(layerIdx int, suffix string) string {
if layerIdx < 0 || layerIdx >= len(layers) {
return ""
}

digestHex := layers[layerIdx].Digest.Hex()
if originalDigest := layers[layerIdx].OriginalDigest; originalDigest != nil {
return filepath.Join(workDir, originalDigest.Hex())
if suffix == "" && layers[layerIdx].OriginalDigest != nil {
digestHex = layers[layerIdx].OriginalDigest.Hex()
}
return filepath.Join(workDir, digestHex+suffix)
}

unpackLayerEntry := func(layerIdx int, entryName string, filePath string) error {
if layerIdx < 0 || layerIdx >= len(layers) {
return errors.Errorf("layer index %d out of bounds", layerIdx)
}

file, err := os.Create(filePath)
if err != nil {
return errors.Wrapf(err, "create %s file", entryName)
}
defer file.Close()

if _, err := UnpackEntry(layers[layerIdx].ReaderAt, entryName, file); err != nil {
return errors.Wrapf(err, "unpack %s", entryName)
}
return filepath.Join(workDir, digestHex)
return nil
}

eg, _ := errgroup.WithContext(ctx)
Expand All @@ -578,7 +600,7 @@ func Merge(ctx context.Context, layers []Layer, dest io.Writer, opt MergeOption)
rafsBlobSizes := []int64{}
rafsBlobTOCDigests := []string{}
for idx := range layers {
sourceBootstrapPaths = append(sourceBootstrapPaths, getBootstrapPath(idx))
sourceBootstrapPaths = append(sourceBootstrapPaths, getLayerPath(idx, ""))
if layers[idx].OriginalDigest != nil {
rafsBlobTOCDigest, err := calcBlobTOCDigest(layers[idx].ReaderAt)
if err != nil {
Expand All @@ -588,17 +610,22 @@ func Merge(ctx context.Context, layers []Layer, dest io.Writer, opt MergeOption)
rafsBlobDigests = append(rafsBlobDigests, layers[idx].Digest.Hex())
rafsBlobSizes = append(rafsBlobSizes, layers[idx].ReaderAt.Size())
}

eg.Go(func(idx int) func() error {
return func() error {
// Use the hex hash string of whole tar blob as the bootstrap name.
bootstrap, err := os.Create(getBootstrapPath(idx))
if err != nil {
return errors.Wrap(err, "create source bootstrap")
if err := unpackLayerEntry(idx, EntryBootstrap, getLayerPath(idx, "")); err != nil {
return err
}
defer bootstrap.Close()

if _, err := UnpackEntry(layers[idx].ReaderAt, EntryBootstrap, bootstrap); err != nil {
return errors.Wrap(err, "unpack nydus tar")
if opt.FsVersion == "6" {
if err := unpackLayerEntry(idx, EntryBlobMeta, getLayerPath(idx, ".blob.meta")); err != nil {
logrus.Warnf("Failed to extract blob.meta.header for layer %d: %v\n", idx, err)
}

if err := unpackLayerEntry(idx, EntryBlobMetaHeader, getLayerPath(idx, ".blob.meta.header")); err != nil {
logrus.Warnf("Failed to extract blob.meta.header for layer %d: %v\n", idx, err)
}
}

return nil
Expand Down Expand Up @@ -637,13 +664,92 @@ func Merge(ctx context.Context, layers []Layer, dest io.Writer, opt MergeOption)
}
defer bootstrapRa.Close()

files := append([]File{
files := []File{
{
Name: EntryBootstrap,
Reader: content.NewReader(bootstrapRa),
Size: bootstrapRa.Size(),
},
}, opt.AppendFiles...)
}

if opt.FsVersion == "6" {
metaRas := make([]io.Closer, 0, len(layers)*2)
defer func() {
for _, closer := range metaRas {
closer.Close()
}
}()

for idx := range layers {
digestHex := layers[idx].Digest.Hex()
blobMetaPath := getLayerPath(idx, ".blob.meta")
blobMetaHeaderPath := getLayerPath(idx, ".blob.meta.header")

metaContent, err := os.ReadFile(blobMetaPath)
if err != nil {
return nil, errors.Wrap(err, "read blob.meta")
}

headerContent, err := os.ReadFile(blobMetaHeaderPath)
if err != nil {
return nil, errors.Wrap(err, "read blob.meta.header")
}
uncompressedSize := len(metaContent)
alignedUncompressedSize := (uncompressedSize + 4095) &^ 4095
totalSize := alignedUncompressedSize + len(headerContent)

if totalSize == 0 {
logrus.Warnf("blob data for layer %s is empty, skipped\n", digestHex)
continue
}

assembledFileName := fmt.Sprintf("%s.blob.meta", digestHex)
assembledFilePath := filepath.Join(workDir, assembledFileName)

writeMetaFile := func() error {
f, err := os.Create(assembledFilePath)
if err != nil {
return err
}
defer f.Close()

if _, err := f.Write(metaContent); err != nil {
return err
}

if padding := alignedUncompressedSize - uncompressedSize; padding > 0 {
if _, err := f.Write(make([]byte, padding)); err != nil {
return err
}
}

if _, err := f.Write(headerContent); err != nil {
return err
}

return f.Sync()
}

if err := writeMetaFile(); err != nil {
return nil, errors.Wrap(err, "write blob meta file")
}

assembledRa, err := local.OpenReader(assembledFilePath)
if err != nil {
return nil, errors.Wrap(err, "open blob meta file")
}

metaRas = append(metaRas, assembledRa)

files = append(files, File{
Name: assembledFileName,
Reader: content.NewReader(assembledRa),
Size: int64(totalSize),
})
}
}

files = append(files, opt.AppendFiles...)
var rc io.ReadCloser

if opt.WithTar {
Expand All @@ -661,7 +767,6 @@ func Merge(ctx context.Context, layers []Layer, dest io.Writer, opt MergeOption)
if _, err = io.CopyBuffer(dest, rc, *buffer); err != nil {
return nil, errors.Wrap(err, "copy merged bootstrap")
}

return blobDigests, nil
}

Expand Down