Merge pull request #1292 from timstclair/tail

Fixes for log tailing
This commit is contained in:
Tim St. Clair 2016-05-19 12:16:24 -07:00
commit fb5fb832ed

View File

@ -20,6 +20,7 @@ import (
"fmt"
"io"
"os"
"path/filepath"
"sync"
"time"
@ -37,8 +38,10 @@ type Tail struct {
watcher *inotify.Watcher
}
const retryOpenInterval = time.Second
const maxOpenAttempts = 3
const (
defaultRetryInterval = 100 * time.Millisecond
maxRetryInterval = 30 * time.Second
)
// NewTail starts opens the given file and watches it for deletion/rotation
func NewTail(filename string) (*Tail, error) {
@ -72,31 +75,15 @@ func (t *Tail) Close() {
close(t.stop)
}
func isEvent(event *inotify.Event, flag uint32) bool {
return event.Mask&flag == flag
}
func (t *Tail) fileChanged() error {
for {
select {
case event := <-t.watcher.Event:
// We don't get IN_DELETE because we are holding the file open
if isEvent(event, inotify.IN_ATTRIB) || isEvent(event, inotify.IN_MOVE_SELF) {
return nil
}
case <-t.stop:
return fmt.Errorf("watch was cancelled")
}
}
}
func (t *Tail) attemptOpen() error {
t.readerLock.Lock()
defer t.readerLock.Unlock()
t.reader = nil
t.readerErr = nil
for attempt := 1; attempt <= maxOpenAttempts; attempt++ {
glog.V(4).Infof("Opening %s (attempt %d of %d)", t.filename, attempt, maxOpenAttempts)
attempt := 0
for interval := defaultRetryInterval; ; interval *= 2 {
attempt++
glog.V(4).Infof("Opening %s (attempt %d)", t.filename, attempt)
var err error
t.file, err = os.Open(t.filename)
if err == nil {
@ -105,8 +92,11 @@ func (t *Tail) attemptOpen() error {
t.reader = bufio.NewReader(t.file)
return nil
}
if interval >= maxRetryInterval {
break
}
select {
case <-time.After(retryOpenInterval):
case <-time.After(interval):
case <-t.stop:
t.readerErr = io.EOF
return fmt.Errorf("watch was cancelled")
@ -133,15 +123,24 @@ func (t *Tail) watchFile() error {
return err
}
defer t.file.Close()
err = t.watcher.Watch(t.filename)
watchDir := filepath.Dir(t.filename)
err = t.watcher.AddWatch(watchDir, inotify.IN_MOVED_FROM|inotify.IN_DELETE)
if err != nil {
return err
return fmt.Errorf("Failed to add watch to directory %s: %v", watchDir, err)
}
defer t.watcher.RemoveWatch(t.filename)
err = t.fileChanged()
if err != nil {
return err
defer t.watcher.RemoveWatch(watchDir)
for {
select {
case event := <-t.watcher.Event:
eventPath := filepath.Clean(event.Name) // Directory events have an extra '/'
if eventPath == t.filename {
glog.V(4).Infof("Log file %s moved/deleted", t.filename)
return nil
}
case <-t.stop:
return fmt.Errorf("watch was cancelled")
}
}
glog.V(4).Infof("Log file %s moved/deleted", t.filename)
return nil
}