Merge pull request #1555 from bakins/bakins/zfs-filesystem-watcher
Add watcher for zfs similar to devicemapper
This commit is contained in:
commit
1dfc7a17eb
@ -33,6 +33,7 @@ import (
|
|||||||
"github.com/google/cadvisor/machine"
|
"github.com/google/cadvisor/machine"
|
||||||
"github.com/google/cadvisor/manager/watcher"
|
"github.com/google/cadvisor/manager/watcher"
|
||||||
dockerutil "github.com/google/cadvisor/utils/docker"
|
dockerutil "github.com/google/cadvisor/utils/docker"
|
||||||
|
"github.com/google/cadvisor/zfs"
|
||||||
|
|
||||||
docker "github.com/docker/engine-api/client"
|
docker "github.com/docker/engine-api/client"
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
@ -105,6 +106,8 @@ type dockerFactory struct {
|
|||||||
ignoreMetrics container.MetricSet
|
ignoreMetrics container.MetricSet
|
||||||
|
|
||||||
thinPoolWatcher *devicemapper.ThinPoolWatcher
|
thinPoolWatcher *devicemapper.ThinPoolWatcher
|
||||||
|
|
||||||
|
zfsWatcher *zfs.ZfsWatcher
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *dockerFactory) String() string {
|
func (self *dockerFactory) String() string {
|
||||||
@ -132,6 +135,7 @@ func (self *dockerFactory) NewContainerHandler(name string, inHostNamespace bool
|
|||||||
self.dockerVersion,
|
self.dockerVersion,
|
||||||
self.ignoreMetrics,
|
self.ignoreMetrics,
|
||||||
self.thinPoolWatcher,
|
self.thinPoolWatcher,
|
||||||
|
self.zfsWatcher,
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -218,6 +222,21 @@ func startThinPoolWatcher(dockerInfo *dockertypes.Info) (*devicemapper.ThinPoolW
|
|||||||
return thinPoolWatcher, nil
|
return thinPoolWatcher, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func startZfsWatcher(dockerInfo *dockertypes.Info) (*zfs.ZfsWatcher, error) {
|
||||||
|
filesystem, err := dockerutil.DockerZfsFilesystem(*dockerInfo)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
zfsWatcher, err := zfs.NewZfsWatcher(filesystem)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
go zfsWatcher.Start()
|
||||||
|
return zfsWatcher, nil
|
||||||
|
}
|
||||||
|
|
||||||
func ensureThinLsKernelVersion(kernelVersion string) error {
|
func ensureThinLsKernelVersion(kernelVersion string) error {
|
||||||
// kernel 4.4.0 has the proper bug fixes to allow thin_ls to work without corrupting the thin pool
|
// kernel 4.4.0 has the proper bug fixes to allow thin_ls to work without corrupting the thin pool
|
||||||
minKernelVersion := semver.MustParse("4.4.0")
|
minKernelVersion := semver.MustParse("4.4.0")
|
||||||
@ -306,6 +325,14 @@ func Register(factory info.MachineInfoFactory, fsInfo fs.FsInfo, ignoreMetrics c
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var zfsWatcher *zfs.ZfsWatcher
|
||||||
|
if storageDriver(dockerInfo.Driver) == zfsStorageDriver {
|
||||||
|
zfsWatcher, err = startZfsWatcher(dockerInfo)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("zfs filesystem stats will not be reported: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
glog.Infof("Registering Docker factory")
|
glog.Infof("Registering Docker factory")
|
||||||
f := &dockerFactory{
|
f := &dockerFactory{
|
||||||
cgroupSubsystems: cgroupSubsystems,
|
cgroupSubsystems: cgroupSubsystems,
|
||||||
@ -317,6 +344,7 @@ func Register(factory info.MachineInfoFactory, fsInfo fs.FsInfo, ignoreMetrics c
|
|||||||
storageDir: RootDir(),
|
storageDir: RootDir(),
|
||||||
ignoreMetrics: ignoreMetrics,
|
ignoreMetrics: ignoreMetrics,
|
||||||
thinPoolWatcher: thinPoolWatcher,
|
thinPoolWatcher: thinPoolWatcher,
|
||||||
|
zfsWatcher: zfsWatcher,
|
||||||
}
|
}
|
||||||
|
|
||||||
container.RegisterContainerHandlerFactory(f, []watcher.ContainerWatchSource{watcher.Raw})
|
container.RegisterContainerHandlerFactory(f, []watcher.ContainerWatchSource{watcher.Raw})
|
||||||
|
@ -29,6 +29,7 @@ import (
|
|||||||
"github.com/google/cadvisor/fs"
|
"github.com/google/cadvisor/fs"
|
||||||
info "github.com/google/cadvisor/info/v1"
|
info "github.com/google/cadvisor/info/v1"
|
||||||
dockerutil "github.com/google/cadvisor/utils/docker"
|
dockerutil "github.com/google/cadvisor/utils/docker"
|
||||||
|
"github.com/google/cadvisor/zfs"
|
||||||
|
|
||||||
docker "github.com/docker/engine-api/client"
|
docker "github.com/docker/engine-api/client"
|
||||||
dockercontainer "github.com/docker/engine-api/types/container"
|
dockercontainer "github.com/docker/engine-api/types/container"
|
||||||
@ -42,6 +43,7 @@ import (
|
|||||||
const (
|
const (
|
||||||
// The read write layers exist here.
|
// The read write layers exist here.
|
||||||
aufsRWLayer = "diff"
|
aufsRWLayer = "diff"
|
||||||
|
|
||||||
// Path to the directory where docker stores log files if the json logging driver is enabled.
|
// Path to the directory where docker stores log files if the json logging driver is enabled.
|
||||||
pathToContainersDir = "containers"
|
pathToContainersDir = "containers"
|
||||||
)
|
)
|
||||||
@ -72,6 +74,12 @@ type dockerContainerHandler struct {
|
|||||||
// the devicemapper device id for the container
|
// the devicemapper device id for the container
|
||||||
deviceID string
|
deviceID string
|
||||||
|
|
||||||
|
// zfs Filesystem
|
||||||
|
zfsFilesystem string
|
||||||
|
|
||||||
|
// zfsParent is the parent for docker zfs
|
||||||
|
zfsParent string
|
||||||
|
|
||||||
// Time at which this container was created.
|
// Time at which this container was created.
|
||||||
creationTime time.Time
|
creationTime time.Time
|
||||||
|
|
||||||
@ -101,6 +109,9 @@ type dockerContainerHandler struct {
|
|||||||
|
|
||||||
// thin pool watcher
|
// thin pool watcher
|
||||||
thinPoolWatcher *devicemapper.ThinPoolWatcher
|
thinPoolWatcher *devicemapper.ThinPoolWatcher
|
||||||
|
|
||||||
|
// zfs watcher
|
||||||
|
zfsWatcher *zfs.ZfsWatcher
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ container.ContainerHandler = &dockerContainerHandler{}
|
var _ container.ContainerHandler = &dockerContainerHandler{}
|
||||||
@ -136,6 +147,7 @@ func newDockerContainerHandler(
|
|||||||
dockerVersion []int,
|
dockerVersion []int,
|
||||||
ignoreMetrics container.MetricSet,
|
ignoreMetrics container.MetricSet,
|
||||||
thinPoolWatcher *devicemapper.ThinPoolWatcher,
|
thinPoolWatcher *devicemapper.ThinPoolWatcher,
|
||||||
|
zfsWatcher *zfs.ZfsWatcher,
|
||||||
) (container.ContainerHandler, error) {
|
) (container.ContainerHandler, error) {
|
||||||
// Create the cgroup paths.
|
// Create the cgroup paths.
|
||||||
cgroupPaths := make(map[string]string, len(cgroupSubsystems.MountPoints))
|
cgroupPaths := make(map[string]string, len(cgroupSubsystems.MountPoints))
|
||||||
@ -172,12 +184,21 @@ func newDockerContainerHandler(
|
|||||||
var (
|
var (
|
||||||
rootfsStorageDir string
|
rootfsStorageDir string
|
||||||
poolName string
|
poolName string
|
||||||
|
zfsFilesystem string
|
||||||
|
zfsParent string
|
||||||
)
|
)
|
||||||
switch storageDriver {
|
switch storageDriver {
|
||||||
case aufsStorageDriver:
|
case aufsStorageDriver:
|
||||||
rootfsStorageDir = path.Join(storageDir, string(aufsStorageDriver), aufsRWLayer, rwLayerID)
|
rootfsStorageDir = path.Join(storageDir, string(aufsStorageDriver), aufsRWLayer, rwLayerID)
|
||||||
case overlayStorageDriver:
|
case overlayStorageDriver:
|
||||||
rootfsStorageDir = path.Join(storageDir, string(overlayStorageDriver), rwLayerID)
|
rootfsStorageDir = path.Join(storageDir, string(overlayStorageDriver), rwLayerID)
|
||||||
|
case zfsStorageDriver:
|
||||||
|
status, err := Status()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("unable to determine docker status: %v", err)
|
||||||
|
}
|
||||||
|
zfsParent = status.DriverStatus[dockerutil.DriverStatusParentDataset]
|
||||||
|
zfsFilesystem = path.Join(zfsParent, rwLayerID)
|
||||||
case devicemapperStorageDriver:
|
case devicemapperStorageDriver:
|
||||||
status, err := Status()
|
status, err := Status()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -199,10 +220,13 @@ func newDockerContainerHandler(
|
|||||||
fsInfo: fsInfo,
|
fsInfo: fsInfo,
|
||||||
rootFs: rootFs,
|
rootFs: rootFs,
|
||||||
poolName: poolName,
|
poolName: poolName,
|
||||||
|
zfsFilesystem: zfsFilesystem,
|
||||||
rootfsStorageDir: rootfsStorageDir,
|
rootfsStorageDir: rootfsStorageDir,
|
||||||
envs: make(map[string]string),
|
envs: make(map[string]string),
|
||||||
ignoreMetrics: ignoreMetrics,
|
ignoreMetrics: ignoreMetrics,
|
||||||
thinPoolWatcher: thinPoolWatcher,
|
thinPoolWatcher: thinPoolWatcher,
|
||||||
|
zfsWatcher: zfsWatcher,
|
||||||
|
zfsParent: zfsParent,
|
||||||
}
|
}
|
||||||
|
|
||||||
// We assume that if Inspect fails then the container is not known to docker.
|
// We assume that if Inspect fails then the container is not known to docker.
|
||||||
@ -245,7 +269,9 @@ func newDockerContainerHandler(
|
|||||||
handler.fsHandler = &dockerFsHandler{
|
handler.fsHandler = &dockerFsHandler{
|
||||||
fsHandler: common.NewFsHandler(common.DefaultPeriod, rootfsStorageDir, otherStorageDir, fsInfo),
|
fsHandler: common.NewFsHandler(common.DefaultPeriod, rootfsStorageDir, otherStorageDir, fsInfo),
|
||||||
thinPoolWatcher: thinPoolWatcher,
|
thinPoolWatcher: thinPoolWatcher,
|
||||||
|
zfsWatcher: zfsWatcher,
|
||||||
deviceID: handler.deviceID,
|
deviceID: handler.deviceID,
|
||||||
|
zfsFilesystem: zfsFilesystem,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -265,7 +291,7 @@ func newDockerContainerHandler(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// dockerFsHandler is a composite FsHandler implementation the incorporates
|
// dockerFsHandler is a composite FsHandler implementation the incorporates
|
||||||
// the common fs handler and a devicemapper ThinPoolWatcher.
|
// the common fs handler, a devicemapper ThinPoolWatcher, and a zfsWatcher
|
||||||
type dockerFsHandler struct {
|
type dockerFsHandler struct {
|
||||||
fsHandler common.FsHandler
|
fsHandler common.FsHandler
|
||||||
|
|
||||||
@ -273,6 +299,11 @@ type dockerFsHandler struct {
|
|||||||
thinPoolWatcher *devicemapper.ThinPoolWatcher
|
thinPoolWatcher *devicemapper.ThinPoolWatcher
|
||||||
// deviceID is the id of the container's fs device
|
// deviceID is the id of the container's fs device
|
||||||
deviceID string
|
deviceID string
|
||||||
|
|
||||||
|
// zfsWatcher is the zfs filesystem watcher
|
||||||
|
zfsWatcher *zfs.ZfsWatcher
|
||||||
|
// zfsFilesystem is the docker zfs filesystem
|
||||||
|
zfsFilesystem string
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ common.FsHandler = &dockerFsHandler{}
|
var _ common.FsHandler = &dockerFsHandler{}
|
||||||
@ -306,6 +337,15 @@ func (h *dockerFsHandler) Usage() common.FsUsage {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if h.zfsWatcher != nil {
|
||||||
|
zfsUsage, err := h.zfsWatcher.GetUsage(h.zfsFilesystem)
|
||||||
|
if err != nil {
|
||||||
|
glog.V(5).Infof("unable to get fs usage from zfs for filesystem %s: %v", h.zfsFilesystem, err)
|
||||||
|
} else {
|
||||||
|
usage.BaseUsageBytes = zfsUsage
|
||||||
|
usage.TotalUsageBytes += zfsUsage
|
||||||
|
}
|
||||||
|
}
|
||||||
return usage
|
return usage
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -359,12 +399,14 @@ func (self *dockerContainerHandler) getFsStats(stats *info.ContainerStats) error
|
|||||||
// Device has to be the pool name to correlate with the device name as
|
// Device has to be the pool name to correlate with the device name as
|
||||||
// set in the machine info filesystems.
|
// set in the machine info filesystems.
|
||||||
device = self.poolName
|
device = self.poolName
|
||||||
case aufsStorageDriver, overlayStorageDriver, zfsStorageDriver:
|
case aufsStorageDriver, overlayStorageDriver:
|
||||||
deviceInfo, err := self.fsInfo.GetDirFsDevice(self.rootfsStorageDir)
|
deviceInfo, err := self.fsInfo.GetDirFsDevice(self.rootfsStorageDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to determine device info for dir: %v: %v", self.rootfsStorageDir, err)
|
return fmt.Errorf("unable to determine device info for dir: %v: %v", self.rootfsStorageDir, err)
|
||||||
}
|
}
|
||||||
device = deviceInfo.Device
|
device = deviceInfo.Device
|
||||||
|
case zfsStorageDriver:
|
||||||
|
device = self.zfsParent
|
||||||
default:
|
default:
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -23,11 +23,12 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
DockerInfoDriver = "Driver"
|
DockerInfoDriver = "Driver"
|
||||||
DockerInfoDriverStatus = "DriverStatus"
|
DockerInfoDriverStatus = "DriverStatus"
|
||||||
DriverStatusPoolName = "Pool Name"
|
DriverStatusPoolName = "Pool Name"
|
||||||
DriverStatusDataLoopFile = "Data loop file"
|
DriverStatusDataLoopFile = "Data loop file"
|
||||||
DriverStatusMetadataFile = "Metadata file"
|
DriverStatusMetadataFile = "Metadata file"
|
||||||
|
DriverStatusParentDataset = "Parent Dataset"
|
||||||
)
|
)
|
||||||
|
|
||||||
func DriverStatusValue(status [][2]string, target string) string {
|
func DriverStatusValue(status [][2]string, target string) string {
|
||||||
@ -68,3 +69,12 @@ func DockerMetadataDevice(info dockertypes.Info) (string, error) {
|
|||||||
|
|
||||||
return metadataDevice, nil
|
return metadataDevice, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func DockerZfsFilesystem(info dockertypes.Info) (string, error) {
|
||||||
|
filesystem := DriverStatusValue(info.DriverStatus, DriverStatusParentDataset)
|
||||||
|
if len(filesystem) == 0 {
|
||||||
|
return "", fmt.Errorf("Could not get zfs filesystem")
|
||||||
|
}
|
||||||
|
|
||||||
|
return filesystem, nil
|
||||||
|
}
|
||||||
|
113
zfs/watcher.go
Normal file
113
zfs/watcher.go
Normal file
@ -0,0 +1,113 @@
|
|||||||
|
// Copyright 2016 Google Inc. All Rights Reserved.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
package zfs
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/golang/glog"
|
||||||
|
zfs "github.com/mistifyio/go-zfs"
|
||||||
|
)
|
||||||
|
|
||||||
|
// zfsWatcher maintains a cache of filesystem -> usage stats for a
|
||||||
|
// zfs filesystem
|
||||||
|
type ZfsWatcher struct {
|
||||||
|
filesystem string
|
||||||
|
lock *sync.RWMutex
|
||||||
|
cache map[string]uint64
|
||||||
|
period time.Duration
|
||||||
|
stopChan chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewThinPoolWatcher returns a new ThinPoolWatcher for the given devicemapper
|
||||||
|
// thin pool name and metadata device or an error.
|
||||||
|
func NewZfsWatcher(filesystem string) (*ZfsWatcher, error) {
|
||||||
|
|
||||||
|
return &ZfsWatcher{
|
||||||
|
filesystem: filesystem,
|
||||||
|
lock: &sync.RWMutex{},
|
||||||
|
cache: make(map[string]uint64),
|
||||||
|
period: 15 * time.Second,
|
||||||
|
stopChan: make(chan struct{}),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start starts the ZfsWatcher.
|
||||||
|
func (w *ZfsWatcher) Start() {
|
||||||
|
err := w.Refresh()
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("encountered error refreshing zfs watcher: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-w.stopChan:
|
||||||
|
return
|
||||||
|
case <-time.After(w.period):
|
||||||
|
start := time.Now()
|
||||||
|
err = w.Refresh()
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("encountered error refreshing zfs watcher: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// print latency for refresh
|
||||||
|
duration := time.Since(start)
|
||||||
|
glog.V(5).Infof("zfs(%d) took %s", start.Unix(), duration)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop stops the ZfsWatcher.
|
||||||
|
func (w *ZfsWatcher) Stop() {
|
||||||
|
close(w.stopChan)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetUsage gets the cached usage value of the given filesystem.
|
||||||
|
func (w *ZfsWatcher) GetUsage(filesystem string) (uint64, error) {
|
||||||
|
w.lock.RLock()
|
||||||
|
defer w.lock.RUnlock()
|
||||||
|
|
||||||
|
v, ok := w.cache[filesystem]
|
||||||
|
if !ok {
|
||||||
|
return 0, fmt.Errorf("no cached value for usage of filesystem %v", filesystem)
|
||||||
|
}
|
||||||
|
|
||||||
|
return v, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Refresh performs a zfs get
|
||||||
|
func (w *ZfsWatcher) Refresh() error {
|
||||||
|
w.lock.Lock()
|
||||||
|
defer w.lock.Unlock()
|
||||||
|
newCache := make(map[string]uint64)
|
||||||
|
parent, err := zfs.GetDataset(w.filesystem)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("encountered error getting zfs filesystem: %s: %v", w.filesystem, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
children, err := parent.Children(0)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("encountered error getting children of zfs filesystem: %s: %v", w.filesystem, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, ds := range children {
|
||||||
|
newCache[ds.Name] = ds.Used
|
||||||
|
}
|
||||||
|
|
||||||
|
w.cache = newCache
|
||||||
|
return nil
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user