From aeee52161c6ef425f7ff9f642b16ce72f7cbee1c Mon Sep 17 00:00:00 2001 From: Rohit Jnagal Date: Tue, 13 Jan 2015 23:13:07 +0000 Subject: [PATCH] Add a utility to read cpu load stats. --- utils/cpuload/conn.go | 92 ++++++++++++ utils/cpuload/cpuload.go | 90 ++++++++++++ utils/cpuload/defs.go | 26 ++++ utils/cpuload/example/example.go | 39 +++++ utils/cpuload/netlink.go | 236 +++++++++++++++++++++++++++++++ 5 files changed, 483 insertions(+) create mode 100644 utils/cpuload/conn.go create mode 100644 utils/cpuload/cpuload.go create mode 100644 utils/cpuload/defs.go create mode 100644 utils/cpuload/example/example.go create mode 100644 utils/cpuload/netlink.go diff --git a/utils/cpuload/conn.go b/utils/cpuload/conn.go new file mode 100644 index 00000000..d57b0e8b --- /dev/null +++ b/utils/cpuload/conn.go @@ -0,0 +1,92 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cpuload + +import ( + "bufio" + "bytes" + "encoding/binary" + "os" + "syscall" +) + +type Connection struct { + // netlink socket + fd int + // cache pid to use in every netlink request. + pid uint32 + // sequence number for netlink messages. + seq uint32 + addr syscall.SockaddrNetlink + rbuf *bufio.Reader +} + +// Create and bind a new netlink socket. +func newConnection() (*Connection, error) { + + fd, err := syscall.Socket(syscall.AF_NETLINK, syscall.SOCK_DGRAM, syscall.NETLINK_GENERIC) + if err != nil { + return nil, err + } + + conn := new(Connection) + conn.fd = fd + conn.seq = 0 + conn.pid = uint32(os.Getpid()) + conn.addr.Family = syscall.AF_NETLINK + conn.rbuf = bufio.NewReader(conn) + err = syscall.Bind(fd, &conn.addr) + if err != nil { + syscall.Close(fd) + return nil, err + } + return conn, err +} + +func (self *Connection) Read(b []byte) (n int, err error) { + n, _, err = syscall.Recvfrom(self.fd, b, 0) + return n, err +} + +func (self *Connection) Write(b []byte) (n int, err error) { + err = syscall.Sendto(self.fd, b, 0, &self.addr) + return len(b), err +} + +func (self *Connection) Close() error { + return syscall.Close(self.fd) +} + +func (self *Connection) WriteMessage(msg syscall.NetlinkMessage) error { + w := bytes.NewBuffer(nil) + msg.Header.Len = uint32(syscall.NLMSG_HDRLEN + len(msg.Data)) + msg.Header.Seq = self.seq + self.seq++ + msg.Header.Pid = self.pid + binary.Write(w, binary.LittleEndian, msg.Header) + _, err := w.Write(msg.Data) + if err != nil { + return err + } + _, err = self.Write(w.Bytes()) + return err +} + +func (self *Connection) ReadMessage() (msg syscall.NetlinkMessage, err error) { + binary.Read(self.rbuf, binary.LittleEndian, &msg.Header) + msg.Data = make([]byte, msg.Header.Len-syscall.NLMSG_HDRLEN) + _, err = self.rbuf.Read(msg.Data) + return msg, err +} diff --git a/utils/cpuload/cpuload.go b/utils/cpuload/cpuload.go new file mode 100644 index 00000000..29f79ca7 --- /dev/null +++ b/utils/cpuload/cpuload.go @@ -0,0 +1,90 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cpuload + +import ( + "fmt" + "os" + + "github.com/golang/glog" +) + +type CpuLoadReader struct { + familyId uint16 + conn *Connection +} + +func New() (*CpuLoadReader, error) { + conn, err := newConnection() + if err != nil { + return nil, fmt.Errorf("failed to create a new connection: %s", err) + } + + id, err := getFamilyId(conn) + if err != nil { + return nil, fmt.Errorf("failed to get netlink family id for task stats: %s", err) + } + glog.V(2).Infof("Family id for taskstats: %d", id) + return &CpuLoadReader{ + familyId: id, + conn: conn, + }, nil +} + +func (self *CpuLoadReader) Close() { + if self.conn != nil { + self.conn.Close() + } +} + +// This mirrors kernel internal structure. +type LoadStats struct { + // Number of sleeping tasks. + NrSleeping uint64 `json:"nr_sleeping"` + + // Number of running tasks. + NrRunning uint64 `json:"nr_running"` + + // Number of tasks in stopped state + NrStopped uint64 `json:"nr_stopped"` + + // Number of tasks in uninterruptible state + NrUinterruptible uint64 `json:"nr_uninterruptible"` + + // Number of tasks waiting on IO + NrIoWait uint64 `json:"nr_io_wait"` +} + +// Returns instantaneous number of running tasks in a group. +// Caller can use historical data to calculate cpu load. +// path is an absolute filesystem path for a container under the CPU cgroup hierarchy. +// NOTE: non-hierarchical load is returned. It does not include load for subcontainers. +func (self *CpuLoadReader) GetCpuLoad(path string) (LoadStats, error) { + if len(path) == 0 { + return LoadStats{}, fmt.Errorf("cgroup path can not be empty!") + } + + cfd, err := os.Open(path) + if err != nil { + return LoadStats{}, fmt.Errorf("failed to open cgroup path %s: %q", path, err) + } + + stats, err := getLoadStats(self.familyId, cfd.Fd(), self.conn) + if err != nil { + return LoadStats{}, err + } + glog.V(1).Infof("Task stats for %q: %+v", path, stats) + return stats, nil +} diff --git a/utils/cpuload/defs.go b/utils/cpuload/defs.go new file mode 100644 index 00000000..3aa25d93 --- /dev/null +++ b/utils/cpuload/defs.go @@ -0,0 +1,26 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cpuload + +/* +#include +*/ +import "C" + +type TaskStats C.struct_taskstats + +const ( + __TASKSTATS_CMD_MAX = C.__TASKSTATS_CMD_MAX +) diff --git a/utils/cpuload/example/example.go b/utils/cpuload/example/example.go new file mode 100644 index 00000000..d07a307e --- /dev/null +++ b/utils/cpuload/example/example.go @@ -0,0 +1,39 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "log" + + "github.com/google/cadvisor/utils/cpuload" +) + +func main() { + c, err := cpuload.New() + if err != nil { + log.Printf("Failed to create cpu load util: %s", err) + return + } + defer c.Close() + + paths := []string{"/sys/fs/cgroup/cpu", "/sys/fs/cgroup/cpu/docker"} + for _, path := range paths { + stats, err := c.GetCpuLoad(path) + if err != nil { + log.Printf("Error getting cpu load for %q: %s", path, err) + } + log.Printf("Task load for %s: %+v", path, stats) + } +} diff --git a/utils/cpuload/netlink.go b/utils/cpuload/netlink.go new file mode 100644 index 00000000..2d810449 --- /dev/null +++ b/utils/cpuload/netlink.go @@ -0,0 +1,236 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cpuload + +import ( + "bytes" + "encoding/binary" + "fmt" + "syscall" +) + +const ( + // Kernel constants for tasks stats. + genlIdCtrl = syscall.NLMSG_MIN_TYPE // GENL_ID_CTRL + taskstatsGenlName = "TASKSTATS" // TASKSTATS_GENL_NAME + cgroupStatsCmdAttrFd = 0x1 // CGROUPSTATS_CMD_ATTR_FD + ctrlAttrFamilyId = 0x1 // CTRL_ATTR_FAMILY_ID + ctrlAttrFamilyName = 0x2 // CTRL_ATTR_FAMILY_NAME + ctrlCmdGetFamily = 0x3 // CTRL_CMD_GETFAMILY +) + +var ( + // TODO(rjnagal): Verify and fix for other architectures. + Endian = binary.LittleEndian +) + +type genMsghdr struct { + Command uint8 + Version uint8 + Reserved uint16 +} + +type netlinkMessage struct { + Header syscall.NlMsghdr + GenHeader genMsghdr + Data []byte +} + +func (self netlinkMessage) toRawMsg() (rawmsg syscall.NetlinkMessage) { + rawmsg.Header = self.Header + w := bytes.NewBuffer([]byte{}) + binary.Write(w, Endian, self.GenHeader) + w.Write(self.Data) + rawmsg.Data = w.Bytes() + return rawmsg +} + +type loadStatsResp struct { + Header syscall.NlMsghdr + GenHeader genMsghdr + Stats LoadStats +} + +// Return required padding to align 'size' to 'alignment'. +func padding(size int, alignment int) int { + unalignedPart := size % alignment + return (alignment - unalignedPart) % alignment +} + +// Get family id for taskstats subsystem. +func getFamilyId(conn *Connection) (uint16, error) { + msg := prepareFamilyMessage() + conn.WriteMessage(msg.toRawMsg()) + + resp, _ := conn.ReadMessage() + id, err := parseFamilyResp(resp) + if err != nil { + return 0, err + } + return id, nil +} + +// Append an attribute to the message. +// Adds attribute info (length and type), followed by the data and necessary padding. +// Can be called multiple times to add attributes. Only fixed size and string type +// attributes are handled. We don't need nested attributes for task stats. +func addAttribute(buf *bytes.Buffer, attrType uint16, data interface{}, dataSize int) { + attr := syscall.RtAttr{ + Len: syscall.SizeofRtAttr, + Type: attrType, + } + attr.Len += uint16(dataSize) + binary.Write(buf, Endian, attr) + switch data := data.(type) { + case string: + binary.Write(buf, Endian, []byte(data)) + buf.WriteByte(0) // terminate + default: + binary.Write(buf, Endian, data) + } + for i := 0; i < padding(int(attr.Len), syscall.NLMSG_ALIGNTO); i++ { + buf.WriteByte(0) + } +} + +// Prepares the message and generic headers and appends attributes as data. +func prepareMessage(headerType uint16, cmd uint8, attributes []byte) (msg netlinkMessage) { + msg.Header.Type = headerType + msg.Header.Flags = syscall.NLM_F_REQUEST + msg.GenHeader.Command = cmd + msg.GenHeader.Version = 0x1 + msg.Data = attributes + return msg +} + +// Prepares message to query family id for task stats. +func prepareFamilyMessage() (msg netlinkMessage) { + buf := bytes.NewBuffer([]byte{}) + addAttribute(buf, ctrlAttrFamilyName, taskstatsGenlName, len(taskstatsGenlName)+1) + return prepareMessage(genlIdCtrl, ctrlCmdGetFamily, buf.Bytes()) +} + +// Prepares message to query task stats for a task group. +func prepareCmdMessage(id uint16, cfd uintptr) (msg netlinkMessage) { + buf := bytes.NewBuffer([]byte{}) + addAttribute(buf, cgroupStatsCmdAttrFd, uint32(cfd), 4) + return prepareMessage(id, __TASKSTATS_CMD_MAX+1, buf.Bytes()) +} + +// Extracts returned family id from the response. +func parseFamilyResp(msg syscall.NetlinkMessage) (uint16, error) { + m := new(netlinkMessage) + m.Header = msg.Header + err := verifyHeader(msg) + if err != nil { + return 0, err + } + buf := bytes.NewBuffer(msg.Data) + // extract generic header from data. + err = binary.Read(buf, Endian, &m.GenHeader) + if err != nil { + return 0, err + } + id := uint16(0) + // Extract attributes. kernel reports family name, id, version, etc. + // Scan till we find id. + for buf.Len() > syscall.SizeofRtAttr { + var attr syscall.RtAttr + err = binary.Read(buf, Endian, &attr) + if err != nil { + return 0, err + } + if attr.Type == ctrlAttrFamilyId { + err = binary.Read(buf, Endian, &id) + if err != nil { + return 0, err + } + return id, nil + } + payload := int(attr.Len) - syscall.SizeofRtAttr + skipLen := payload + padding(payload, syscall.SizeofRtAttr) + name := make([]byte, skipLen) + err = binary.Read(buf, Endian, name) + if err != nil { + return 0, err + } + } + return 0, fmt.Errorf("family id not found in the response.") +} + +// Extract task stats from response returned by kernel. +func parseLoadStatsResp(msg syscall.NetlinkMessage) (*loadStatsResp, error) { + m := new(loadStatsResp) + m.Header = msg.Header + err := verifyHeader(msg) + if err != nil { + return m, err + } + buf := bytes.NewBuffer(msg.Data) + // Scan the general header. + err = binary.Read(buf, Endian, &m.GenHeader) + if err != nil { + return m, err + } + // cgroup stats response should have just one attribute. + // Read it directly into the stats structure. + var attr syscall.RtAttr + err = binary.Read(buf, Endian, &attr) + if err != nil { + return m, err + } + err = binary.Read(buf, Endian, &m.Stats) + if err != nil { + return m, err + } + return m, err +} + +// Verify and return any error reported by kernel. +func verifyHeader(msg syscall.NetlinkMessage) error { + switch msg.Header.Type { + case syscall.NLMSG_DONE: + return fmt.Errorf("expected a response, got nil") + case syscall.NLMSG_ERROR: + buf := bytes.NewBuffer(msg.Data) + var errno int32 + binary.Read(buf, Endian, errno) + return fmt.Errorf("netlink request failed with error %s", syscall.Errno(-errno)) + } + return nil +} + +// Get load stats for a task group. +// id: family id for taskstats. +// fd: fd to path to the cgroup directory under cpu hierarchy. +// conn: open netlink connection used to communicate with kernel. +func getLoadStats(id uint16, fd uintptr, conn *Connection) (LoadStats, error) { + msg := prepareCmdMessage(id, fd) + err := conn.WriteMessage(msg.toRawMsg()) + if err != nil { + return LoadStats{}, err + } + + resp, err := conn.ReadMessage() + if err != nil { + return LoadStats{}, err + } + + parsedmsg, err := parseLoadStatsResp(resp) + if err != nil { + return LoadStats{}, err + } + return parsedmsg.Stats, nil +}