caddy-log-exporter/vendor/github.com/testcontainers/testcontainers-go/reaper.go

401 lines
14 KiB
Go

package testcontainers
import (
"bufio"
"context"
"fmt"
"math/rand"
"net"
"strings"
"sync"
"time"
"github.com/cenkalti/backoff/v4"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/errdefs"
"github.com/docker/go-connections/nat"
"github.com/testcontainers/testcontainers-go/internal/config"
"github.com/testcontainers/testcontainers-go/internal/core"
"github.com/testcontainers/testcontainers-go/wait"
)
const (
// Deprecated: it has been replaced by the internal core.LabelLang
TestcontainerLabel = "org.testcontainers.golang"
// Deprecated: it has been replaced by the internal core.LabelSessionID
TestcontainerLabelSessionID = TestcontainerLabel + ".sessionId"
// Deprecated: it has been replaced by the internal core.LabelReaper
TestcontainerLabelIsReaper = TestcontainerLabel + ".reaper"
)
var (
// Deprecated: it has been replaced by an internal value
ReaperDefaultImage = config.ReaperDefaultImage
reaperInstance *Reaper // We would like to create reaper only once
reaperMutex sync.Mutex
reaperOnce sync.Once
)
// ReaperProvider represents a provider for the reaper to run itself with
// The ContainerProvider interface should usually satisfy this as well, so it is pluggable
type ReaperProvider interface {
RunContainer(ctx context.Context, req ContainerRequest) (Container, error)
Config() TestcontainersConfig
}
// NewReaper creates a Reaper with a sessionID to identify containers and a provider to use
// Deprecated: it's not possible to create a reaper anymore. Compose module uses this method
// to create a reaper for the compose stack.
func NewReaper(ctx context.Context, sessionID string, provider ReaperProvider, reaperImageName string) (*Reaper, error) {
return reuseOrCreateReaper(ctx, sessionID, provider)
}
// reaperContainerNameFromSessionID returns the container name that uniquely
// identifies the container based on the session id.
func reaperContainerNameFromSessionID(sessionID string) string {
// The session id is 64 characters, so we will not hit the limit of 128
// characters for container names.
return fmt.Sprintf("reaper_%s", sessionID)
}
// lookUpReaperContainer returns a DockerContainer type with the reaper container in the case
// it's found in the running state, and including the labels for sessionID, reaper, and ryuk.
// It will perform a retry with exponential backoff to allow for the container to be started and
// avoid potential false negatives.
func lookUpReaperContainer(ctx context.Context, sessionID string) (*DockerContainer, error) {
dockerClient, err := NewDockerClientWithOpts(ctx)
if err != nil {
return nil, err
}
defer dockerClient.Close()
// the backoff will take at most 5 seconds to find the reaper container
// doing each attempt every 100ms
exp := backoff.NewExponentialBackOff()
// we want random intervals between 100ms and 500ms for concurrent executions
// to not be synchronized: it could be the case that multiple executions of this
// function happen at the same time (specifically when called from a different test
// process execution), and we want to avoid that they all try to find the reaper
// container at the same time.
exp.InitialInterval = time.Duration(rand.Intn(5)*100) * time.Millisecond
exp.RandomizationFactor = rand.Float64() * 0.5
exp.Multiplier = rand.Float64() * 2.0
exp.MaxInterval = 5.0 * time.Second // max interval between attempts
exp.MaxElapsedTime = 1 * time.Minute // max time to keep trying
opts := container.ListOptions{
All: true,
Filters: filters.NewArgs(
filters.Arg("label", fmt.Sprintf("%s=%s", core.LabelSessionID, sessionID)),
filters.Arg("label", fmt.Sprintf("%s=%t", core.LabelReaper, true)),
filters.Arg("label", fmt.Sprintf("%s=%t", core.LabelRyuk, true)),
filters.Arg("name", reaperContainerNameFromSessionID(sessionID)),
),
}
return backoff.RetryNotifyWithData(
func() (*DockerContainer, error) {
resp, err := dockerClient.ContainerList(ctx, opts)
if err != nil {
return nil, err
}
if len(resp) == 0 {
// reaper container not found in the running state: do not look for it again
return nil, nil
}
if len(resp) > 1 {
return nil, fmt.Errorf("not possible to have multiple reaper containers found for session ID %s", sessionID)
}
r, err := containerFromDockerResponse(ctx, resp[0])
if err != nil {
return nil, err
}
if r.healthStatus == types.Healthy || r.healthStatus == types.NoHealthcheck {
return r, nil
}
// if a health status is present on the container, and the container is healthy, error
if r.healthStatus != "" {
return nil, fmt.Errorf("container %s is not healthy, wanted status=%s, got status=%s", resp[0].ID[:8], types.Healthy, r.healthStatus)
}
return r, nil
},
backoff.WithContext(exp, ctx),
func(err error, duration time.Duration) {
Logger.Printf("Error looking up reaper container, will retry: %v", err)
},
)
}
// reuseOrCreateReaper returns an existing Reaper instance if it exists and is running. Otherwise, a new Reaper instance
// will be created with a sessionID to identify containers in the same test session/program.
func reuseOrCreateReaper(ctx context.Context, sessionID string, provider ReaperProvider) (*Reaper, error) {
reaperMutex.Lock()
defer reaperMutex.Unlock()
// 1. if the reaper instance has been already created, return it
if reaperInstance != nil {
// Verify this instance is still running by checking state.
// Can't use Container.IsRunning because the bool is not updated when Reaper is terminated
state, err := reaperInstance.container.State(ctx)
if err != nil {
if !errdefs.IsNotFound(err) {
return nil, err
}
} else if state.Running {
return reaperInstance, nil
}
// else: the reaper instance has been terminated, so we need to create a new one
reaperOnce = sync.Once{}
}
// 2. because the reaper instance has not been created yet, look for it in the Docker daemon, which
// will happen if the reaper container has been created in the same test session but in a different
// test process execution (e.g. when running tests in parallel), not having initialized the reaper
// instance yet.
reaperContainer, err := lookUpReaperContainer(context.Background(), sessionID)
if err == nil && reaperContainer != nil {
// The reaper container exists as a Docker container: re-use it
Logger.Printf("🔥 Reaper obtained from Docker for this test session %s", reaperContainer.ID)
reaperInstance, err = reuseReaperContainer(ctx, sessionID, provider, reaperContainer)
if err != nil {
return nil, err
}
return reaperInstance, nil
}
// 3. the reaper container does not exist in the Docker daemon: create it, and do it using the
// synchronization primitive to avoid multiple executions of this function to create the reaper
var reaperErr error
reaperOnce.Do(func() {
r, err := newReaper(ctx, sessionID, provider)
if err != nil {
reaperErr = err
return
}
reaperInstance, reaperErr = r, nil
})
if reaperErr != nil {
reaperOnce = sync.Once{}
return nil, reaperErr
}
return reaperInstance, nil
}
// reuseReaperContainer constructs a Reaper from an already running reaper
// DockerContainer.
func reuseReaperContainer(ctx context.Context, sessionID string, provider ReaperProvider, reaperContainer *DockerContainer) (*Reaper, error) {
endpoint, err := reaperContainer.PortEndpoint(ctx, "8080", "")
if err != nil {
return nil, err
}
Logger.Printf("⏳ Waiting for Reaper port to be ready")
var containerJson *types.ContainerJSON
if containerJson, err = reaperContainer.Inspect(ctx); err != nil {
return nil, fmt.Errorf("failed to inspect reaper container %s: %w", reaperContainer.ID[:8], err)
}
if containerJson != nil && containerJson.NetworkSettings != nil {
for port := range containerJson.NetworkSettings.Ports {
err := wait.ForListeningPort(port).
WithPollInterval(100*time.Millisecond).
WaitUntilReady(ctx, reaperContainer)
if err != nil {
return nil, fmt.Errorf("failed waiting for reaper container %s port %s/%s to be ready: %w",
reaperContainer.ID[:8], port.Proto(), port.Port(), err)
}
}
}
return &Reaper{
Provider: provider,
SessionID: sessionID,
Endpoint: endpoint,
container: reaperContainer,
}, nil
}
// newReaper creates a Reaper with a sessionID to identify containers and a
// provider to use. Do not call this directly, use reuseOrCreateReaper instead.
func newReaper(ctx context.Context, sessionID string, provider ReaperProvider) (*Reaper, error) {
dockerHostMount := core.ExtractDockerSocket(ctx)
reaper := &Reaper{
Provider: provider,
SessionID: sessionID,
}
listeningPort := nat.Port("8080/tcp")
tcConfig := provider.Config().Config
req := ContainerRequest{
Image: config.ReaperDefaultImage,
ExposedPorts: []string{string(listeningPort)},
Labels: core.DefaultLabels(sessionID),
Privileged: tcConfig.RyukPrivileged,
WaitingFor: wait.ForListeningPort(listeningPort),
Name: reaperContainerNameFromSessionID(sessionID),
HostConfigModifier: func(hc *container.HostConfig) {
hc.AutoRemove = true
hc.Binds = []string{dockerHostMount + ":/var/run/docker.sock"}
hc.NetworkMode = Bridge
},
Env: map[string]string{},
}
if to := tcConfig.RyukConnectionTimeout; to > time.Duration(0) {
req.Env["RYUK_CONNECTION_TIMEOUT"] = to.String()
}
if to := tcConfig.RyukReconnectionTimeout; to > time.Duration(0) {
req.Env["RYUK_RECONNECTION_TIMEOUT"] = to.String()
}
if tcConfig.RyukVerbose {
req.Env["RYUK_VERBOSE"] = "true"
}
// include reaper-specific labels to the reaper container
req.Labels[core.LabelReaper] = "true"
req.Labels[core.LabelRyuk] = "true"
// Attach reaper container to a requested network if it is specified
if p, ok := provider.(*DockerProvider); ok {
req.Networks = append(req.Networks, p.DefaultNetwork)
}
c, err := provider.RunContainer(ctx, req)
if err != nil {
// We need to check whether the error is caused by a container with the same name
// already existing due to race conditions. We manually match the error message
// as we do not have any error types to check against.
if createContainerFailDueToNameConflictRegex.MatchString(err.Error()) {
// Manually retrieve the already running reaper container. However, we need to
// use retries here as there are two possible race conditions that might lead to
// errors: In most cases, there is a small delay between container creation and
// actually being visible in list-requests. This means that creation might fail
// due to name conflicts, but when we list containers with this name, we do not
// get any results. In another case, the container might have simply died in the
// meantime and therefore cannot be found.
const timeout = 5 * time.Second
const cooldown = 100 * time.Millisecond
start := time.Now()
var reaperContainer *DockerContainer
for time.Since(start) < timeout {
reaperContainer, err = lookUpReaperContainer(ctx, sessionID)
if err == nil && reaperContainer != nil {
break
}
select {
case <-ctx.Done():
case <-time.After(cooldown):
}
}
if err != nil {
return nil, fmt.Errorf("look up reaper container due to name conflict failed: %w", err)
}
// If the reaper container was not found, it is most likely to have died in
// between as we can exclude any client errors because of the previous error
// check. Because the reaper should only die if it performed clean-ups, we can
// fail here as the reaper timeout needs to be increased, anyway.
if reaperContainer == nil {
return nil, fmt.Errorf("look up reaper container returned nil although creation failed due to name conflict")
}
Logger.Printf("🔥 Reaper obtained from Docker for this test session %s", reaperContainer.ID)
reaper, err := reuseReaperContainer(ctx, sessionID, provider, reaperContainer)
if err != nil {
return nil, err
}
return reaper, nil
}
return nil, err
}
reaper.container = c
endpoint, err := c.PortEndpoint(ctx, "8080", "")
if err != nil {
return nil, err
}
reaper.Endpoint = endpoint
return reaper, nil
}
// Reaper is used to start a sidecar container that cleans up resources
type Reaper struct {
Provider ReaperProvider
SessionID string
Endpoint string
container Container
}
// Connect runs a goroutine which can be terminated by sending true into the returned channel
func (r *Reaper) Connect() (chan bool, error) {
conn, err := net.DialTimeout("tcp", r.Endpoint, 10*time.Second)
if err != nil {
return nil, fmt.Errorf("%w: Connecting to Ryuk on %s failed", err, r.Endpoint)
}
terminationSignal := make(chan bool)
go func(conn net.Conn) {
sock := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
defer conn.Close()
labelFilters := []string{}
for l, v := range core.DefaultLabels(r.SessionID) {
labelFilters = append(labelFilters, fmt.Sprintf("label=%s=%s", l, v))
}
retryLimit := 3
for retryLimit > 0 {
retryLimit--
if _, err := sock.WriteString(strings.Join(labelFilters, "&")); err != nil {
continue
}
if _, err := sock.WriteString("\n"); err != nil {
continue
}
if err := sock.Flush(); err != nil {
continue
}
resp, err := sock.ReadString('\n')
if err != nil {
continue
}
if resp == "ACK\n" {
break
}
}
<-terminationSignal
}(conn)
return terminationSignal, nil
}
// Labels returns the container labels to use so that this Reaper cleans them up
// Deprecated: internally replaced by core.DefaultLabels(sessionID)
func (r *Reaper) Labels() map[string]string {
return map[string]string{
core.LabelLang: "go",
core.LabelSessionID: r.SessionID,
}
}