Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,17 @@ src > dst | src = dst | ❌
src <= dst | src != dst | ✅
src <= dst | src == dst | ❌

###### Hash only
With `--hash-only` flag, it's possible to use the strategy that would only compare file sizes and hashes. Source treated as **source of truth** and any difference in sizes or hashes would cause `s5cmd` to copy source object to destination. Multipart upload will always sync the file.

The hash can be stored remotely or calculated locally. If `s5cmd` calculate the hash from a local file, it performs many operations. To perform these operations in parallel and quickly, the `sync` uses the `numworkers` flag. As many `numworkers` are specified, as many threads will be created to calculate the hash.

hash | size | should sync
------------|--------------|-------------
src != dst | src == dst | ✅
src != dst | src != dst | ✅
src == dst | src == dst | ❌

### Dry run
`--dry-run` flag will output what operations will be performed without actually
carrying out those operations.
Expand Down Expand Up @@ -674,6 +685,8 @@ For example, if you are uploading 100 files to an S3 bucket and the `--numworker
s5cmd --numworkers 10 cp '/Users/foo/bar/*' s3://mybucket/foo/bar/
```

Additionally, this flag is used to calculate hashes when using `sync` operation with `--hash-only` flag.

### concurrency

`concurrency` is a `cp` command option. It sets the number of parts that will be uploaded or downloaded in parallel for a single file.
Expand Down
69 changes: 42 additions & 27 deletions command/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,22 +52,25 @@ Examples:
05. Sync S3 bucket to local folder but use size as only comparison criteria.
> s5cmd {{.HelpName}} --size-only "s3://bucket/*" folder/

06. Sync a file to S3 bucket
06. Sync S3 bucket to local folder but use size and hash as comparasion criteria.
> s5cmd {{.HelpName}} --hash-only "s3://bucket/*" folder/

07. Sync a file to S3 bucket
> s5cmd {{.HelpName}} myfile.gz s3://bucket/

07. Sync matching S3 objects to another bucket
08. Sync matching S3 objects to another bucket
> s5cmd {{.HelpName}} "s3://bucket/*.gz" s3://target-bucket/prefix/

08. Perform KMS Server Side Encryption of the object(s) at the destination
09. Perform KMS Server Side Encryption of the object(s) at the destination
> s5cmd {{.HelpName}} --sse aws:kms s3://bucket/object s3://target-bucket/prefix/object

09. Perform KMS-SSE of the object(s) at the destination using customer managed Customer Master Key (CMK) key id
10. Perform KMS-SSE of the object(s) at the destination using customer managed Customer Master Key (CMK) key id
> s5cmd {{.HelpName}} --sse aws:kms --sse-kms-key-id <your-kms-key-id> s3://bucket/object s3://target-bucket/prefix/object

10. Sync all files to S3 bucket but exclude the ones with txt and gz extension
11. Sync all files to S3 bucket but exclude the ones with txt and gz extension
> s5cmd {{.HelpName}} --exclude "*.txt" --exclude "*.gz" dir/ s3://bucket

11. Sync all files to S3 bucket but include the only ones with txt and gz extension
12. Sync all files to S3 bucket but include the only ones with txt and gz extension
> s5cmd {{.HelpName}} --include "*.txt" --include "*.gz" dir/ s3://bucket
`

Expand All @@ -81,6 +84,10 @@ func NewSyncCommandFlags() []cli.Flag {
Name: "size-only",
Usage: "make size of object only criteria to decide whether an object should be synced",
},
&cli.BoolFlag{
Name: "hash-only",
Usage: "make hash and size of object only criteria to decide whether an object should be synced",
},
&cli.BoolFlag{
Name: "exit-on-error",
Usage: "stops the sync process if an error is received",
Expand Down Expand Up @@ -130,6 +137,7 @@ type Sync struct {
// flags
delete bool
sizeOnly bool
hashOnly bool
exitOnError bool

// s3 options
Expand All @@ -138,6 +146,7 @@ type Sync struct {
followSymlinks bool
storageClass storage.StorageClass
raw bool
numWorkers int

srcRegion string
dstRegion string
Expand All @@ -154,12 +163,14 @@ func NewSync(c *cli.Context) Sync {
// flags
delete: c.Bool("delete"),
sizeOnly: c.Bool("size-only"),
hashOnly: c.Bool("hash-only"),
exitOnError: c.Bool("exit-on-error"),

// flags
followSymlinks: !c.Bool("no-follow-symlinks"),
storageClass: storage.StorageClass(c.String("storage-class")),
raw: c.Bool("raw"),
numWorkers: c.Int("numworkers"),
// region settings
srcRegion: c.String("source-region"),
dstRegion: c.String("destination-region"),
Expand Down Expand Up @@ -228,11 +239,11 @@ func (s Sync) Run(c *cli.Context) error {
}
}()

strategy := NewStrategy(s.sizeOnly) // create comparison strategy.
pipeReader, pipeWriter := io.Pipe() // create a reader, writer pipe to pass commands to run
strategy := NewStrategy(s.sizeOnly, s.hashOnly) // create comparison strategy.
pipeReader, pipeWriter := io.Pipe() // create a reader, writer pipe to pass commands to run

// Create commands in background.
go s.planRun(c, onlySource, onlyDest, commonObjects, dsturl, strategy, pipeWriter, isBatch)
go s.planRun(c, onlySource, onlyDest, commonObjects, dsturl, strategy, pipeWriter, isBatch, s.numWorkers)

err = NewRun(c, pipeReader).Run(ctx)
return multierror.Append(err, merrorWaiter).ErrorOrNil()
Expand Down Expand Up @@ -444,6 +455,7 @@ func (s Sync) planRun(
strategy SyncStrategy,
w io.WriteCloser,
isBatch bool,
numWorkers int,
) {
defer w.Close()

Expand Down Expand Up @@ -474,26 +486,29 @@ func (s Sync) planRun(
}()

// both in source and destination
wg.Add(1)
go func() {
defer wg.Done()
for commonObject := range common {
sourceObject, destObject := commonObject.src, commonObject.dst
curSourceURL, curDestURL := sourceObject.URL, destObject.URL
err := strategy.ShouldSync(sourceObject, destObject) // check if object should be copied.
if err != nil {
printDebug(s.op, err, curSourceURL, curDestURL)
continue
}
// needs several goroutines because HashSync reads a lot of files from the file system
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for commonObject := range common {
sourceObject, destObject := commonObject.src, commonObject.dst
curSourceURL, curDestURL := sourceObject.URL, destObject.URL
err := strategy.ShouldSync(sourceObject, destObject) // check if object should be copied.
if err != nil {
printDebug(s.op, err, curSourceURL, curDestURL)
continue
}

command, err := generateCommand(c, "cp", defaultFlags, curSourceURL, curDestURL)
if err != nil {
printDebug(s.op, err, curSourceURL, curDestURL)
continue
command, err := generateCommand(c, "cp", defaultFlags, curSourceURL, curDestURL)
if err != nil {
printDebug(s.op, err, curSourceURL, curDestURL)
continue
}
fmt.Fprintln(w, command)
}
fmt.Fprintln(w, command)
}
}()
}()
}

// only in destination
wg.Add(1)
Expand Down
61 changes: 60 additions & 1 deletion command/sync_strategy.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package command

import (
"crypto/md5"
"encoding/hex"
"io"
"os"

errorpkg "github.com/peak/s5cmd/v2/error"
"github.com/peak/s5cmd/v2/storage"
)
Expand All @@ -11,9 +16,11 @@ type SyncStrategy interface {
ShouldSync(srcObject, dstObject *storage.Object) error
}

func NewStrategy(sizeOnly bool) SyncStrategy {
func NewStrategy(sizeOnly bool, hashOnly bool) SyncStrategy {
if sizeOnly {
return &SizeOnlyStrategy{}
} else if hashOnly {
return &HashStrategy{}
} else {
return &SizeAndModificationStrategy{}
}
Expand Down Expand Up @@ -50,3 +57,55 @@ func (sm *SizeAndModificationStrategy) ShouldSync(srcObj, dstObj *storage.Object

return errorpkg.ErrObjectIsNewerAndSizesMatch
}

// HashStrategy determines to sync based on objects' hashes and sizes.
// It treats source object as the source-of-truth; Source object can be local file or remote (s3).
//
// md5 hash: src != dst should sync: yes
// md5 hash: src == dst should sync: no
// md5 hash: src multipart upload should sync: yes (always)
// md5 hash: can't open src should sync: yes (but cp won't be able to open the file)
type HashStrategy struct{}

func (s *HashStrategy) ShouldSync(srcObj, dstObj *storage.Object) error {
// Firstly check size. Maybe the sizes will be different.
if srcObj.Size != dstObj.Size {
return nil
}

srcHash := getHash(srcObj)
dstHash := getHash(dstObj)

if srcHash == dstHash {
return errorpkg.ErrObjectEtagsMatch
}

return nil
}

func getHash(obj *storage.Object) string {
// if remote (s3) then should has Etag
// if not remote (s3) but has Etag then return it
if obj.URL.IsRemote() || obj.Etag != "" {
return obj.Etag
} else {
// cp.go opens the file again. It MAY be possible not to open the file again to calculate the hash.
// fs.go Stat loads file metadata. It is possible to calculate md5 hash in that place, but not necessary.
file, err := os.OpenFile(obj.URL.String(), os.O_RDONLY, 0644)
// Can't open source file? Push it to the storage.
// Not sure about this place. Maybe should throw exception and stop execution.
// But if can't open file here, then can't open file in cp and upload it.
if err != nil {
return ""
}
defer file.Close()

var md5Obj = md5.New()
buf := make([]byte, obj.Size)
if _, err := io.CopyBuffer(md5Obj, file, buf); err != nil {
return ""
}

return hex.EncodeToString(md5Obj.Sum(nil))
}
}
Loading