...
 
Commits (113)
language: go
script:
- go test -v
before_install:
- export TZ=Europe/Berlin
- sudo apt-get install -y zsh
- go get github.com/daviddengcn/go-colortext
/*
* Copyright © 2014 Sebastian Stark <stark@tuebingen.mpg.de>
* Copyright © 2018 Sebastian Stark <sstark@mailbox.org>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
......
BIN= snaprd
PREFIX= /usr/local
${BIN}: *.go Makefile
go get github.com/daviddengcn/go-colortext
go build -o ${BIN}
checkfmt:
@gofmt -d *.go
test:
env TZ=Europe/Berlin go test -cover -race
install: ${BIN}
install ${BIN} ${PREFIX}/bin
clean:
rm -f ${BIN}
This diff is collapsed.
snaprd - backup utility
============================
NOT READY FOR PRODUCTION USE
============================
Overview
--------
- continuous creation of snapshots at certain intervals
- pruning (sieving) snapshots based on fixed schedule, make
snapshots more scarce the older they get
- uses rsync to create snapshots
- every snapshots is a complete copy, using hard links to
save disk space
- designed to run silently in the background
- repository is designed to be exported via e. g. nfs or smb
to enable users to do restores of single files or directories
Building
--------
Run the script ./build.sh in this directory.
Testing
-------
To run regression testing, run the script ./test.sh in this
directory. It is not necessary to build before.
- pretty symlinks for users
- check for free space
- make list command show snapshots as a tree. Maybe optional
- investigate rsync option -y, --fuzzy
- how to deal with oldest snapshot?
......@@ -11,7 +9,6 @@
- think about if it is useful to add the full origin path name to the repository subdirs
- regularly log memory stats
- deal with negative time shifts in transComplete()
- user configurable schedules? Or many predefined ones?
- subcommand "schedule" to show details about schedules:
- expected number of snapshots
- expected disk usage, given a start value + daily changes
......@@ -22,3 +19,8 @@
- Read http://golang.org/ref/spec#Receive_operator again and rethink subcmdRun()
design. Use close(c) when appropriate.
- support more than one directory to backup (avoid having to run many instances on a system)
- mail hook in case of failed/missed backup
- Test failure and non-failure rsync errors (e. g. 24)
- "snaprd log" subcmd to print log ring buffer
- extend sched subcmd to be more useful
- parse rsync output and fill some extra info struct that can be stored in the repository
#!/bin/bash
go build -race -o snaprd src/*.go
#!/bin/bash
if [[ $1 == "fix" ]]
then
diff="-w"
else
diff="-d"
fi
gofmt -tabs=false -tabwidth=4 $diff src/*.go
......@@ -6,15 +6,32 @@
package main
import (
"time"
"time"
)
type Clock interface {
Now() time.Time
type clock interface {
Now() time.Time
}
type realClock struct{}
func (realClock) Now() time.Time {
return time.Now()
return time.Now()
}
type skewClock struct {
skew time.Duration
}
func (cl *skewClock) Now() time.Time {
return time.Now().Add(-cl.skew)
}
func newSkewClock(i int64) *skewClock {
d := time.Now().Sub(time.Unix(i, 0))
return &skewClock{skew: d}
}
func (cl *skewClock) forward(d time.Duration) {
cl.skew -= d
}
/* See the file "LICENSE.txt" for the full license governing this code. */
package main
import (
"testing"
"time"
)
func TestSkewClock(t *testing.T) {
var st int64 = 18
var inc int64 = 5
clock := newSkewClock(st)
t1 := clock.Now().Unix()
clock.forward(time.Second * time.Duration(inc))
t2 := clock.Now().Unix()
if t1 != st {
t.Errorf("wanted %d, but got %v", st, t1)
}
if t2 != st+inc {
t.Errorf("wanted %d, but got %v", st+inc, t2)
}
}
/* See the file "LICENSE.txt" for the full license governing this code. */
// Global configuration with disk caching
// Parsing of command line flags
package main
import (
"encoding/json"
"errors"
"flag"
"fmt"
"io/ioutil"
"log"
"os"
"path/filepath"
"strings"
)
const (
myName = "snaprd"
defaultSchedFileName = "/etc/" + myName + ".schedules"
dataSubdir = ".data"
defaultRepository = "/tmp/snaprd_dest"
)
type opts []string
// opts getter
func (o *opts) String() string {
return fmt.Sprintf("\"%s\"", strings.Join(*o, ""))
}
// opts setter
func (o *opts) Set(value string) error {
*o = strings.Split(value, " ")
return nil
}
// Config is used as a backing store for parsed flags
type Config struct {
RsyncPath string
RsyncOpts opts
Origin string
repository string
Schedule string
verbose bool
showAll bool
MaxKeep int
NoPurge bool
NoWait bool
NoLogDate bool
SchedFile string
MinPercSpace float64
MinGiBSpace int
Notify string
noColor bool
}
// WriteCache writes the global configuration to disk as a json file.
func (c *Config) WriteCache() error {
cacheFile := filepath.Join(c.repository, "."+myName+".settings")
debugf("trying to write cached settings to %s", cacheFile)
jsonConfig, err := json.MarshalIndent(c, "", " ")
if err != nil {
log.Println("could not write config:", err)
return err
}
err = ioutil.WriteFile(cacheFile, jsonConfig, 0644)
return err
}
// ReadCache reads from the json configuration cache and resets assorted global
// configuration values from it.
func (c *Config) ReadCache() error {
t := new(Config)
cacheFile := filepath.Join(c.repository, "."+myName+".settings")
debugf("trying to read cached settings from %s", cacheFile)
b, err := ioutil.ReadFile(filepath.Join(c.repository, "."+myName+".settings"))
if err != nil {
return err
}
err = json.Unmarshal(b, &t)
if err != nil {
return err
}
c.RsyncPath = t.RsyncPath
c.RsyncOpts = t.RsyncOpts
if t.SchedFile != "" {
c.SchedFile = t.SchedFile
schedules.addFromFile(c.SchedFile)
}
c.Origin = t.Origin
if _, ok := schedules[t.Schedule]; ok == false {
return fmt.Errorf("no such schedule: %s", t.Schedule)
}
c.Schedule = t.Schedule
c.MaxKeep = t.MaxKeep
c.NoPurge = t.NoPurge
c.MinPercSpace = t.MinPercSpace
c.MinGiBSpace = t.MinGiBSpace
return nil
}
var subcmd string
func usage() {
fmt.Printf("%s %s\n", myName, version)
fmt.Printf(`usage: %[1]s <command> <options>
Commands:
run Periodically create snapshots
list List snapshots
scheds List schedules
help Show usage instructions
Use <command> -h to show possible options for <command>.
Examples:
%[1]s run -origin=fileserver:/export/projects -repository=/snapshots/projects
%[1]s list -repository=/snapshots/projects
`, myName)
}
func loadConfig() (*Config, error) {
config := new(Config)
if len(os.Args) > 1 {
subcmd = os.Args[1]
} else {
return nil, errors.New("no subcommand given")
}
switch subcmd {
case "run":
{
flags := flag.NewFlagSet(subcmd, flag.ContinueOnError)
flags.StringVar(&(config.RsyncPath),
"rsyncPath", "/usr/bin/rsync",
"path to rsync binary")
flags.Var(&(config.RsyncOpts),
"rsyncOpts",
"additional options for rsync")
flags.StringVar(&(config.Origin),
"origin", "/tmp/snaprd_test/",
"data source")
flags.StringVar(&(config.repository),
"repository", defaultRepository,
"where to store snapshots")
flags.StringVar(&(config.repository),
"r", defaultRepository,
"(shorthand for -repository)")
flags.StringVar(&(config.Schedule),
"schedule", "longterm",
"one of "+schedules.String())
flags.IntVar(&(config.MaxKeep),
"maxKeep", 0,
"how many snapshots to keep in highest (oldest) interval. Use 0 to keep all")
flags.BoolVar(&(config.NoPurge),
"noPurge", false,
"if set, obsolete snapshots will not be deleted (minimum space requirements will still be honoured)")
flags.BoolVar(&(config.NoWait),
"noWait", false,
"if set, skip the initial waiting time before the first snapshot")
flags.BoolVar(&(config.NoLogDate),
"noLogDate", false,
"if set, does not print date and time in the log output. Useful if output is redirected to syslog")
flags.StringVar(&(config.SchedFile),
"schedFile", defaultSchedFileName,
"path to external schedules")
flags.Float64Var(&(config.MinPercSpace),
"minPercSpace", 0,
"if set, keep at least x% of the snapshots filesystem free")
flags.IntVar(&(config.MinGiBSpace),
"minGbSpace", 0,
"if set, keep at least x GiB of the snapshots filesystem free")
flags.StringVar(&(config.Notify),
"notify", "",
"specify an email address to send reports")
if err := flags.Parse(os.Args[2:]); err != nil {
return nil, err
}
if config.SchedFile != "" {
err := schedules.addFromFile(config.SchedFile)
if err != nil {
return nil, err
}
}
if _, ok := schedules[config.Schedule]; ok == false {
return nil, fmt.Errorf("no such schedule: %s\n", config.Schedule)
}
path := filepath.Join(config.repository, dataSubdir)
debugf("creating repository: %s", path)
err := os.MkdirAll(path, 00755)
if err != nil {
return nil, err
}
err = config.WriteCache()
if err != nil {
log.Print("could not write settings cache file:", err)
}
return config, nil
}
case "list":
{
flags := flag.NewFlagSet(subcmd, flag.ContinueOnError)
flags.StringVar(&(config.repository),
"repository", defaultRepository,
"where snapshots are located")
flags.StringVar(&(config.repository),
"r", defaultRepository,
"(shorthand for -repository)")
flags.BoolVar(&(config.verbose),
"v", false,
"show more information")
flags.BoolVar(&(config.showAll),
"a", false,
"show all snapshots. Otherwise only complete snapshots are shown")
flags.StringVar(&(config.Schedule),
"schedule", "longterm",
"one of "+schedules.String())
flags.StringVar(&(config.SchedFile),
"schedFile", defaultSchedFileName,
"path to external schedules")
flags.BoolVar(&(config.noColor),
"noColor", false,
"do not colorize list output")
if err := flags.Parse(os.Args[2:]); err != nil {
return nil, err
}
if config.SchedFile != "" {
err := schedules.addFromFile(config.SchedFile)
if err != nil {
return nil, err
}
}
err := config.ReadCache()
if err != nil {
return nil, fmt.Errorf("error reading repository settings: %s\n", err)
}
debugf("cached config: %v", config)
return config, nil
}
case "help", "-h", "--help":
{
usage()
os.Exit(0)
}
case "scheds":
{
flags := flag.NewFlagSet(subcmd, flag.ContinueOnError)
flags.StringVar(&(config.SchedFile),
"schedFile", defaultSchedFileName,
"path to external schedules")
if err := flags.Parse(os.Args[2:]); err != nil {
return nil, err
}
if config.SchedFile != "" {
schedules.addFromFile(config.SchedFile)
}
return config, nil
}
default:
{
return nil, fmt.Errorf("unknown subcommand: \"%s\". Try \"help\".", subcmd)
}
}
return nil, nil
}
......@@ -44,6 +44,6 @@ for ((n=0; n<3; n++)); do
sleep ${sleep:-0}
done
mkdir $2
mkdir -p $3
exit $retval
/* See the file "LICENSE.txt" for the full license governing this code. */
// Low-level filesystem utilities
package main
import (
"fmt"
"io/ioutil"
"log"
"os"
"path"
"strings"
"syscall"
)
// GiB is exactly one gibibyte (2^30)
const GiB = 1024 * 1024 * 1024
// checkFreeSpace verifies the space constraints specified by the user. Return
// true if all the constraints are satisfied, or in case something unusual
// happens.
func checkFreeSpace(baseDir string, minPerc float64, minGiB int) bool {
// This is just to avoid the system call if there is nothing to check
if minPerc <= 0 && minGiB <= 0 {
return true
}
var stats syscall.Statfs_t
debugf("Trying to check free space in %s", baseDir)
err := syscall.Statfs(baseDir, &stats)
if err != nil {
log.Println("could not check free space:", err)
// We cannot return false if there is an error, otherwise we risk
// deleting more than we should
return true
}
sizeBytes := uint64(stats.Bsize) * stats.Blocks
freeBytes := uint64(stats.Bsize) * stats.Bfree
debugf("We have %f GiB, and %f GiB of them are free.", float64(sizeBytes)/GiB, float64(freeBytes)/GiB)
// The actual check... we fail it we are below either the absolute or the
// relative value
if int(freeBytes/GiB) < minGiB || (100*float64(freeBytes)/float64(sizeBytes)) < minPerc {
return false
}
return true
}
// updateSymlinks creates user-friendly symlinks to all complete snapshots. It
// also removes symlinks to snapshots that have been purged.
func updateSymlinks() {
entries, err := ioutil.ReadDir(config.repository)
if err != nil {
log.Println("could not read repository directory", config.repository)
return
}
for _, f := range entries {
pathName := path.Join(config.repository, f.Name())
if isDanglingSymlink(pathName) {
debugf("symlink %s is dangling, remove", pathName)
err := os.Remove(pathName)
if err != nil {
log.Println("could not remove link", pathName)
}
}
}
cl := new(realClock)
snapshots, err := findSnapshots(cl)
if err != nil {
log.Println("could not list snapshots")
return
}
for _, s := range snapshots.state(stateComplete, none) {
target := path.Join(dataSubdir, s.Name())
stime := s.startTime.Format("Monday_2006-01-02_15.04.05")
linkname := path.Join(config.repository, stime)
overwriteSymlink(target, linkname)
}
return
}
// isDanglingSymlink returns true only if linkname is a relative symlink
// pointing to a non-existing path in dataSubdir.
func isDanglingSymlink(linkname string) bool {
target, err := os.Readlink(linkname)
if err != nil {
return false
}
//debugf("%s: %v", linkname, err)
if path.IsAbs(target) {
return false
}
pe := strings.Split(target, "/")
if len(pe) == 0 || pe[0] != dataSubdir {
return false
}
basedir := path.Dir(linkname)
_, err = os.Stat(path.Join(basedir, target))
if err != nil && os.IsNotExist(err) {
return true
}
return false
}
// overwriteSymlink creates a symbolic link from linkname to target. It will
// overwrite an already existing link under linkname, but not if it finds a
// regular file or directory (or anything else which is not a symlink) under
// that name.
func overwriteSymlink(target, linkname string) (err error) {
fi, err := os.Lstat(linkname)
if err != nil {
// link does not exist or can not be read. Ignore.
//debugf("%v", err)
}
if fi != nil {
// link exists
if fi.Mode()&os.ModeSymlink != 0 {
// link is indeed a symlink
ltarget, lerr := os.Readlink(linkname)
if lerr != nil {
debugf("could not read %s: %v", linkname, lerr)
}
// short cut if the link is already pointing to the desired target
if ltarget == target {
return
}
debugf("symlink \"%s\" needs removal: %s != %s", linkname, target, ltarget)
err = os.Remove(linkname)
if err != nil {
// link can not be removed
return
}
} else {
err = fmt.Errorf("won't overwrite %s, it is not a symlink (%v)", linkname, fi.Mode())
return
}
}
err = os.Symlink(target, linkname)
if err == nil {
//debugf("symlink %s -> %s", linkname, target)
}
return
}
/* See the file "LICENSE.txt" for the full license governing this code. */
package main
import (
"log"
"os"
"path"
"syscall"
"testing"
)
var testDir = os.Getenv("HOME")
func gatherTestData(baseDir string) (data syscall.Statfs_t) {
err := syscall.Statfs(testDir, &data)
if err != nil {
log.Println("could not check free space:", err)
}
return
}
func TestCheckFreeSpace(t *testing.T) {
// First, gather the data
data := gatherTestData("/")
var actualFreePerc = 100 * float64(data.Bfree) / float64(data.Blocks)
var actualFreeGiB = int(uint64(data.Bsize) * data.Bfree / GiB)
// Now, let's make a quick run of the test
var result bool
result = checkFreeSpace(testDir, 0, 0)
if !result {
t.Errorf("Short run failure")
}
// Successful absolute free space
result = checkFreeSpace(testDir, 0, actualFreeGiB/2)
if !result {
t.Errorf("Error in successful absolute free space test")
}
// Successful relative free space
result = checkFreeSpace(testDir, actualFreePerc/2, 0)
if !result {
t.Errorf("Error in successful relative free space test")
}
// Successful combined free space
result = checkFreeSpace(testDir, actualFreePerc/2, actualFreeGiB/2)
if !result {
t.Errorf("Error in successful combined free space test")
}
// Failed absolute free space
result = checkFreeSpace(testDir, 0, actualFreeGiB*2)
if result {
t.Errorf("Error in failed absolute free space test")
}
// Failed relative free space
result = checkFreeSpace(testDir, actualFreePerc*2, 0)
if result {
t.Errorf("Error in failed absolute free space test")
}
// Failed combined free space
result = checkFreeSpace(testDir, actualFreePerc*2, actualFreeGiB*2)
if result {
t.Errorf("Error in Failed combined free space test")
}
}
type dslTestPair struct {
linkname string
target string
isDangling bool
}
var dslTestPairs = []dslTestPair{
dslTestPair{"link1", path.Join(dataSubdir, mockSnapshots[0]), false},
dslTestPair{"link2", path.Join(dataSubdir, mockSnapshots[1]), false},
dslTestPair{"link3", "1400337531-1400337532-notexist", false},
dslTestPair{"link4", path.Join("/absolute", dataSubdir, "1400337531-1400337532-notexist"), false},
dslTestPair{"link5", "notdatasubdir/1400337531-1400337532-notexist", false},
dslTestPair{"link6", path.Join(dataSubdir, "/1400337531-1400337532-notexist"), true},
dslTestPair{"link7", dataSubdir, false},
dslTestPair{"link8", path.Join(dataSubdir, "/notexist"), true},
}
func TestIsDanglingSymlink(t *testing.T) {
mockConfig()
mockRepository()
defer os.RemoveAll(config.repository)
for i := range dslTestPairs {
lname := path.Join(config.repository, dslTestPairs[i].linkname)
tname := dslTestPairs[i].target
overwriteSymlink(tname, lname)
got := isDanglingSymlink(lname)
wanted := dslTestPairs[i].isDangling
if got != wanted {
t.Errorf("%s: got %v, wanted %v", lname, got, wanted)
}
}
}
func TestOverwriteSymlink(t *testing.T) {
mockConfig()
mockRepository()
defer os.RemoveAll(config.repository)
testdir := path.Join(config.repository, "somedir")
os.Mkdir(testdir, 0777)
err := overwriteSymlink("irrelevant", testdir)
if err == nil {
t.Errorf("%s was overwritten, but it shouldn't", testdir)
}
testfile := path.Join(config.repository, "somefile")
_, _ = os.Create(testfile)
err = overwriteSymlink("irrelevant", testfile)
if err == nil {
t.Errorf("%s was overwritten, but it shouldn't", testfile)
}
testlink := path.Join(config.repository, "somelink")
_ = os.Symlink("irrelevant", testlink)
err = overwriteSymlink("irrelevant", testlink)
if err != nil {
t.Errorf("%s was not overwritten, but it should", testlink)
}
}
func TestUpdateSymlinks(t *testing.T) {
mockConfig()
mockRepository()
defer os.RemoveAll(config.repository)
updateSymlinks()
symlink := path.Join(config.repository, "Saturday_2014-05-17_16.38.51")
target, err := os.Readlink(symlink)
if target != path.Join(dataSubdir, mockSnapshots[0]) {
t.Errorf("symlink %s -> %s is wrong or missing: %v", symlink, target, err)
}
}
/* See the file "LICENSE.txt" for the full license governing this code. */
// Simple lock file mechanism to prevent multiple instances to run
package main
import (
"fmt"
"io/ioutil"
"log"
"os"
"strconv"
)
type pidLocker struct {
pid int
f string
}
func newPidLocker(lockfile string) *pidLocker {
return &pidLocker{
pid: os.Getpid(),
f: lockfile,
}
}
func (pl *pidLocker) Lock() error {
_, err := os.Stat(pl.f)
if err == nil {
return fmt.Errorf("pid file %s already exists. Is snaprd running already?", pl.f)
}
debugf("write pid %d to pidfile %s", pl.pid, pl.f)
err = ioutil.WriteFile(pl.f, []byte(strconv.Itoa(pl.pid)), 0666)
if err != nil {
return fmt.Errorf("could not write pid file %s: %s", pl.f, err)
}
return nil
}
func (pl *pidLocker) Unlock() {
debugf("delete pidfile %s", pl.f)
err := os.Remove(pl.f)
if err != nil {
log.Printf("could not remove pid file %s: %s", pl.f, err)
}
}
package main
import (
"fmt"
"io/ioutil"
"log"
"os/exec"
)
func FailureMail(exitCode int, logBuffer *RingIO) {
mail := fmt.Sprintf("snaprd exited with return value %d.\nLatest log output:\n\n%s",
exitCode, logBuffer.GetAsText())
subject := fmt.Sprintf("snaprd failure (origin: %s)", config.Origin)
SendMail(config.Notify, subject, mail)
}
func RsyncIssueMail(rsyncError error, rsyncErrorCode int) {
var errText string
if s, ok := rsyncIgnoredErrors[rsyncErrorCode]; ok == true {
errText = s
} else {
errText = "<unknown>"
}
mail := fmt.Sprintf(`rsync finished with error: %s (%s).
This is a non-fatal error, snaprd will try again.`, rsyncError, errText)
subject := fmt.Sprintf("snaprd rsync error (origin: %s)", config.Origin)
// In this case we care that the mail command is not blocking the whole
// program
go SendMail(config.Notify, subject, mail)
}
func NotifyMail(to, msg string) {
SendMail(to, "snaprd notice", msg)
}
func SendMail(to, subject, msg string) {
sendmail := exec.Command("mail", "-s", subject, to)
stdin, err := sendmail.StdinPipe()
if err != nil {
log.Println(err)
return
}
stdout, err := sendmail.StdoutPipe()
if err != nil {
log.Println(err)
return
}
sendmail.Start()
stdin.Write([]byte(msg))
stdin.Write([]byte("\n"))
stdin.Close()
ioutil.ReadAll(stdout)
sendmail.Wait()
log.Printf("sending notification to %s done\n", to)
}
/* See the file "LICENSE.txt" for the full license governing this code. */
// Functions according to sub commands given on command line
// Main snapshot creation and purging loops
package main
import (
"errors"
"flag"
"fmt"
"github.com/daviddengcn/go-colortext"
"io"
"io/ioutil"
"log"
"os"
"os/signal"
"path/filepath"
"syscall"
"time"
)
const initialWait = time.Second * 30
var config *Config
var logger *log.Logger
func debugf(format string, args ...interface{}) {
if os.Getenv("SNAPRD_DEBUG") == "1" {
logger.Output(2, "<DEBUG> "+fmt.Sprintf(format, args...))
}
}
// lastGoodTicker is the clock for the create loop. It takes the last
// created snapshot on its input channel and outputs it on the output channel,
// but only after an appropriate waiting time. To start things off, the first
// lastGood snapshot has to be read from disk.
func lastGoodTicker(in, out chan *snapshot, cl clock) {
var gap, wait time.Duration
var sn *snapshot
sn = lastGoodFromDisk(cl)
if sn != nil {
debugf("lastgood from disk: %s\n", sn.String())
}
// kick off the loop
go func() {
in <- sn
return
}()
for {
sn := <-in
if sn != nil {
gap = cl.Now().Sub(sn.startTime)
debugf("gap: %s", gap)
wait = schedules[config.Schedule][0] - gap
if wait > 0 {
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, syscall.SIGUSR2)
log.Println("wait", wait, "before next snapshot")
select {
case <-sigc:
log.Println("Snapshot forced by signal, skipping wait time.")
case <-time.After(wait):
debugf("Awoken at %s\n", cl.Now())
}
}
}
out <- sn
}
}
// subcmdRun is the main, long-running routine and starts off a couple of
// helper goroutines.
func subcmdRun() (ferr error) {
pl := newPidLocker(filepath.Join(config.repository, ".pid"))
err := pl.Lock()
if err != nil {
ferr = err
return
}
defer pl.Unlock()
if !config.NoWait {
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, syscall.SIGINT, syscall.SIGTERM)
log.Printf("waiting %s before making snapshots\n", initialWait)
select {
case <-sigc:
return errors.New("-> Early exit")
case <-time.After(initialWait):
}
}
createExit := make(chan bool)
createExitDone := make(chan error)
// The obsoleteQueue should not be larger than the absolute number of
// expected snapshots. However, there is no way (yet) to calculate that
// number.
obsoleteQueue := make(chan *snapshot, 10000)
lastGoodIn := make(chan *snapshot)
lastGoodOut := make(chan *snapshot)
// Empty type for the channel: we don't care about what is inside, only
// about the fact that there is something inside
freeSpaceCheck := make(chan struct{})
cl := new(realClock)
go lastGoodTicker(lastGoodIn, lastGoodOut, cl)
// Snapshot creation loop
go func() {
var lastGood *snapshot
var createError error
CREATE_LOOP:
for {
debugf("start of create loop")
select {
case <-createExit:
debugf("gracefully exiting snapshot creation goroutine")
lastGoodOut = nil
break CREATE_LOOP
case lastGood = <-lastGoodOut:
sn, err := createSnapshot(lastGood)
if err != nil || sn == nil {
debugf("snapshot creation finally failed (%s), the partial transfer will hopefully be reused", err)
createError = err
go func() {
// need to stop the lastGoodTicker here because it could
// happen that it will be faster and the create loop would
// run again instead of exiting
lastGoodOut = nil
debugf("subcmdRun: sending createExit")
createExit <- true
debugf("subcmdRun: createExit sent")
return
}()
time.Sleep(time.Second)
}
lastGoodIn <- sn
debugf("pruning")
prune(obsoleteQueue, cl)
// If we purge automatically all the expired snapshots,
// there's nothing to remove to free space.
if config.NoPurge {
debugf("checking space constraints")
freeSpaceCheck <- struct{}{}
}
}
}
createExitDone <- createError
}()
debugf("started snapshot creation goroutine")
// Usually the purger gets its input only from prune(). But there could be
// snapshots left behind from a previously failed snaprd run, so we fill
// the obsoleteQueue once at the beginning.
for _, sn := range findDangling(cl) {
obsoleteQueue <- sn
}
// Purger loop
go func() {
for {
if sn := <-obsoleteQueue; !config.NoPurge {
sn.purge()
}
}
}()
debugf("started purge goroutine")
// If we are going to automatically purge all expired snapshots, we
// needn't even starting the gofunc
if config.NoPurge {
// Free space claiming function
go func() {
for {
// Wait until we are ordered to do something
<-freeSpaceCheck
// Get all obsolete snapshots
// This returns a sorted list
snapshots, err := findSnapshots(cl)
if err != nil {
log.Println(err)
return
}
if len(snapshots) < 2 {
log.Println("less than 2 snapshots found, not pruning")
return
}
obsolete := snapshots.state(stateObsolete, none)
// We only delete as long as we need *AND* we have something to delete
for !checkFreeSpace(config.repository, config.MinPercSpace, config.MinGiBSpace) && len(obsolete) > 0 {
// If there is not enough space, purge the oldest snapshot
last := len(obsolete) - 1
obsolete[last].purge()
// We remove it from the list, it's quicker than recalculating the list.
obsolete = obsolete[:last]
}
}
}()
}
// Global signal handling
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, syscall.SIGINT, syscall.SIGTERM, syscall.SIGUSR1)
select {
case sig := <-sigc:
debugf("Got signal %s", sig)
switch sig {
case syscall.SIGINT, syscall.SIGTERM:
log.Println("-> Immediate exit")
case syscall.SIGUSR1:
log.Println("-> Graceful exit")
createExit <- true
ferr = <-createExitDone
}
// ferr will hold the error that happened in the CREATE_LOOP
case ferr = <-createExitDone:
log.Println("-> Rsync exit")
}
return
}
// subcmdList give the user an overview of what's in the repository.
func subcmdList(cl clock) {
intervals := schedules[config.Schedule]
if cl == nil {
cl = new(realClock)
}
snapshots, err := findSnapshots(cl)
if err != nil {
log.Println(err)
}
for n := len(intervals) - 2; n >= 0; n-- {
debugf("listing interval %d", n)
if config.showAll {
snapshots = snapshots.state(any, none)
} else {
snapshots = snapshots.state(stateComplete, none)
}
snapshots := snapshots.interval(intervals, n, cl)
debugf("snapshots in interval %d: %s", n, snapshots)
if n < len(intervals)-2 {
ct.Foreground(ct.Yellow, false)
fmt.Printf("### From %s ago, %d/%d\n", intervals.offset(n+1), len(snapshots), intervals.goal(n))
ct.ResetColor()
} else {
ct.Foreground(ct.Yellow, false)
if config.MaxKeep != 0 {
fmt.Printf("### From past, %d/%d\n", len(snapshots), config.MaxKeep)
} else if config.MinPercSpace != 0 {
fmt.Printf("### From past, %d/(keep %.1f%% free)\n", len(snapshots), config.MinPercSpace)
} else if config.MinGiBSpace != 0 {
fmt.Printf("### From past, %d/(keep %dGiB free)\n", len(snapshots), config.MinGiBSpace)
} else {
fmt.Printf("### From past, %d/∞\n", len(snapshots))
}
ct.ResetColor()
}
for i, sn := range snapshots {
stime := sn.startTime.Format("2006-01-02 Monday 15:04:05")
var dur, dist time.Duration
if i < len(snapshots)-1 {
dist = snapshots[i+1].startTime.Sub(sn.startTime)
}
if sn.endTime.After(sn.startTime) {
dur = sn.endTime.Sub(sn.startTime)
}
if config.verbose {
fmt.Printf("%d %s (%s, %s/%s, %s) \"%s\"\n", n, stime, dur, intervals[n], dist, sn.state, sn.Name())
} else {
fmt.Printf("%s (%s, %s)\n", stime, dur, intervals[n])
}
}
}
}
func mainExitCode(logIO io.Writer) int {
logger = log.New(logIO, "", log.Ldate|log.Ltime|log.Lshortfile)
log.SetOutput(logIO)
var err error
if config, err = loadConfig(); err != nil || config == nil {
if err == flag.ErrHelp {
return 0
}
log.Println(err)
return 1
}
if config.NoLogDate {
log.SetFlags(logger.Flags() - log.Ldate - log.Ltime)
logger.SetFlags(logger.Flags() - log.Ldate - log.Ltime)
}
switch subcmd {
case "run":
log.Printf("%s %s started with pid %d\n", myName, version, os.Getpid())
log.Printf("### Repository: %s, Origin: %s, Schedule: %s\n", config.repository, config.Origin, config.Schedule)
err = subcmdRun()
if err != nil {
log.Println(err)
return 2
}
case "list":
if config.noColor {
ct.Writer = ioutil.Discard
}
ct.Foreground(ct.Green, false)
fmt.Printf("### Repository: %s, Origin: %s, Schedule: %s\n", config.repository, config.Origin, config.Schedule)
ct.ResetColor()
subcmdList(nil)
case "scheds":
schedules.list()
}
return 0
}
func main() {
rio := newRingIO(os.Stderr, 25, 100)
exitCode := mainExitCode(rio)
// do not send a notification when error code is 0 or 1 (error in flag handling)
// because in the case 1 we can not access the config yet.
if exitCode > 1 && config.Notify != "" {
FailureMail(exitCode, rio)
}
os.Exit(exitCode)
}
package main
import "os"
func ExampleSubcmdList() {
mockConfig()
mockRepository()
defer os.RemoveAll(config.repository)
cl := newSkewClock(startAt)
subcmdList(cl)
// Output:
// ### From past, 1/2
// 2014-05-17 Saturday 16:38:51 (1s, 1m20s)
// ### From 2m20s ago, 2/2
// 2014-05-17 Saturday 16:40:11 (1s, 40s)
// 2014-05-17 Saturday 16:40:51 (1s, 40s)
// ### From 1m0s ago, 2/2
// 2014-05-17 Saturday 16:41:11 (1s, 20s)
// 2014-05-17 Saturday 16:41:31 (1s, 20s)
// ### From 20s ago, 4/4
// 2014-05-17 Saturday 16:41:46 (1s, 5s)
// 2014-05-17 Saturday 16:41:51 (1s, 5s)
// 2014-05-17 Saturday 16:41:56 (1s, 5s)
// 2014-05-17 Saturday 16:42:01 (1s, 5s)
}
func ExampleScheds() {
schedules.list()
// Output:
// longterm: [6h0m0s 24h0m0s 168h0m0s 672h0m0s 876000h0m0s]
// shortterm: [10m0s 2h0m0s 24h0m0s 168h0m0s 672h0m0s 876000h0m0s]
// test1: [24h0m0s 168h0m0s 672h0m0s 876000h0m0s]
// testing: [5s 20s 2m20s 4m40s 876000h0m0s]
// testing2: [5s 20s 40s 1m20s 876000h0m0s]
}
/* See the file "LICENSE.txt" for the full license governing this code. */
// Snapshot pruning ("aging")
// This is the core functionality: keep the number of snapshots within the
// user selected schedule's limits or maximum disk usage or other constraints
package main
import (
"log"
)
// Sieves snapshots according to schedule and marks them as obsolete. Also,
// enqueue them in the buffered channel q for later reuse or deletion.
func prune(q chan *snapshot, cl clock) {
intervals := schedules[config.Schedule]
// interval 0 does not need pruning, start with 1
for i := len(intervals) - 2; i > 0; i-- {
snapshots, err := findSnapshots(cl)
if err != nil {
log.Println(err)
return
}
if len(snapshots) < 2 {
log.Println("less than 2 snapshots found, not pruning")
return
}
iv := snapshots.interval(intervals, i, cl).state(stateComplete, stateObsolete)
pruneAgain := false
if len(iv) > 2 {
// prune highest interval by maximum number
if (i == len(intervals)-2) &&
(len(iv) > config.MaxKeep) &&
(config.MaxKeep != 0) {
debugf("%d snapshots in oldest interval", len(iv))
log.Printf("mark oldest as obsolete: %s", iv[0])
err := iv[0].transObsolete()
if err != nil {
log.Printf("could not transition snapshot: %s", err)
}
q <- iv[0]
pruneAgain = true
}
// regularly prune by sieving
youngest := len(iv) - 1
secondYoungest := youngest - 1
dist := iv[youngest].startTime.Sub(iv[secondYoungest].startTime)
if dist.Seconds() < intervals[i].Seconds() {
log.Printf("mark as obsolete: %s", iv[youngest].Name())
err := iv[youngest].transObsolete()
if err != nil {
log.Printf("could not transition snapshot: %s", err)
}
q <- iv[youngest]
pruneAgain = true
}
if pruneAgain {
prune(q, cl)
}
}
}
}
/* See the file "LICENSE.txt" for the full license governing this code. */
package main
import (
"io/ioutil"
"log"
"os"
"path/filepath"
"testing"
"time"
)
const (
startAt int64 = 1400337722
)
var mockSnapshots = []string{
"1400337531-1400337532-complete",
"1400337611-1400337612-complete",
"1400337651-1400337652-complete",
"1400337671-1400337672-complete",
"1400337691-1400337692-complete",
"1400337706-1400337707-complete",
"1400337711-1400337712-complete",
"1400337716-1400337717-complete",
"1400337721-1400337722-complete",
}
func mockConfig() {
tmpRepository, err := ioutil.TempDir("", "snaprd_testing")
if err != nil {
panic("could not create temporary directory")
}
config = &Config{
repository: tmpRepository,
Schedule: "testing2",
MaxKeep: 2,
NoPurge: false,
SchedFile: "testdata/snaprd.schedules",
}
}
func mockRepository() {
for _, s := range mockSnapshots {
os.MkdirAll(filepath.Join(config.repository, dataSubdir, s), 0777)
}
}
func assertSnapshotChanLen(t *testing.T, c chan *snapshot, want int) {
if got := len(c); got != want {
t.Errorf("channel %v contains %v snapshots, wanted %v", c, got, want)
}
}
func assertSnapshotChanItem(t *testing.T, c chan *snapshot, want string) {
if got := <-c; got.String() != want {
t.Errorf("prune() obsoleted %v, wanted %v", got.String(), want)
}
}
type pruneTestPair struct {
iteration time.Duration
obsoleted []string
}
func TestPrune(t *testing.T) {
log.SetOutput(ioutil.Discard)
mockConfig()
mockRepository()
schedules.addFromFile(config.SchedFile)
defer os.RemoveAll(config.repository)
cl := newSkewClock(startAt)
c := make(chan *snapshot, 100)
tests := []pruneTestPair{
{0,
[]string{},
},
{schedules[config.Schedule][0],
[]string{
"1400337706-1400337707 Obsolete",
},
},
{schedules[config.Schedule][0] * 10,
[]string{
"1400337716-1400337717 Obsolete",
"1400337711-1400337712 Obsolete",
"1400337691-1400337692 Obsolete",
},
},
{schedules[config.Schedule][0] * 20,
[]string{
"1400337531-1400337532 Obsolete",
"1400337721-1400337722 Obsolete",
"1400337611-1400337612 Obsolete",
"1400337671-1400337672 Obsolete",
},
},
}
for _, pair := range tests {
cl.forward(pair.iteration)
prune(c, cl)
assertSnapshotChanLen(t, c, len(pair.obsoleted))
for _, snS := range pair.obsoleted {
assertSnapshotChanItem(t, c, snS)
}
}
}
/* See the file "LICENSE.txt" for the full license governing this code. */
package main
import (
"bytes"
"io"
"sync"
)
type RingIO struct {
out io.Writer // the io we are proxying
maxLen int // max number of lines
maxElem int // max length of a line
mu *sync.Mutex
buf map[int][]byte // a map holding the lines
p int // points to the current item in the map
}
// newRingIO instantiates a new RingIO list, which satisfies the io.Writer
// out is an io.Writer that will write the output to the final destination.
// maxLen will be the maximum number of elements kept in the ring buffer. If
// this number is reached, for each Write() the first element will be removed
// before the new element is added.
// maxElem is the maximum size in bytes of an individual element of the list.
func newRingIO(out io.Writer, maxLen int, maxElem int) *RingIO {
return &RingIO{
out: out,
maxLen: maxLen,
maxElem: maxElem,
mu: new(sync.Mutex),
buf: make(map[int][]byte),
p: 0,
}
}
func (r *RingIO) Write(s []byte) (int, error) {
r.mu.Lock()
defer r.mu.Unlock()
var e []byte
// we need to copy the slice because the caller may be reusing it
c := make([]byte, len(s))
copy(c, s)
// write to the io.Writer we are proxying
r.out.Write(c)
ls := len(c)
// if needed, truncate the new entry to maxElem bytes and append a newline
if ls > r.maxElem {
e = append(c[0:r.maxElem], byte('\n'))
} else {
e = c
}
r.buf[r.p] = e
// reset the pointer if maxLen is reached
if r.p < r.maxLen-1 {
r.p += 1
} else {
r.p = 0
}
return len(c), nil
}
// GetAll returns all elements of the ring buffer as a slice of byte slices
func (r *RingIO) GetAll() [][]byte {
r.mu.Lock()
defer r.mu.Unlock()
var ret [][]byte
// return buf, but starting from where the pointer currently points to
for i := r.p; i < r.maxLen; i += 1 {
ret = append(ret, r.buf[i])
}
for i := 0; i < r.p; i += 1 {
ret = append(ret, r.buf[i])
}
return ret
}
// GetAsText concatenates all buffered lines into one byte slice
func (r *RingIO) GetAsText() []byte {
var b bytes.Buffer
for _, l := range r.GetAll() {
b.Write(l)
}
return b.Bytes()
}
/* See the file "LICENSE.txt" for the full license governing this code. */
package main
import (
"bytes"
"reflect"
"testing"
)
type rioTestPair struct {
params [2]int
in [][]byte
out []byte
}
func TestRingIO(t *testing.T) {
tests := []rioTestPair{
{
[2]int{3, 12},
[][]byte{
[]byte("a string\n"),
[]byte("another string\n"),
[]byte("something\n"),
[]byte("else\n"),
},
[]byte("another stri\nsomething\nelse\n"),
},
{