diff --git a/pkg/log/task_reader.go b/pkg/log/task_reader.go index 1b92320196..9ec8efa9b3 100644 --- a/pkg/log/task_reader.go +++ b/pkg/log/task_reader.go @@ -120,13 +120,13 @@ func (r *Reader) readAvailableTaskLogs(tr *v1.TaskRun) (<-chan Log, <-chan error return logC, errC, nil } -func (r *Reader) readStepsLogs(logC chan<- Log, errC chan<- error, steps []*step, pod *pods.Pod, follow, timestamps bool) { +func (r *Reader) readStepsLogs(logC chan<- Log, errC chan<- error, steps []*step, podRef *pods.Pod, pod *corev1.Pod, follow, timestamps bool) { for _, step := range steps { if !follow && !step.hasStarted() { continue } - container := pod.Container(step.container) + container := podRef.Container(step.container) containerLogC, containerLogErrC, err := container.LogReader(follow, timestamps).Read() if err != nil { errC <- fmt.Errorf("error in getting logs for step %s: %s", step.name, err) @@ -153,10 +153,16 @@ func (r *Reader) readStepsLogs(logC chan<- Log, errC chan<- error, steps []*step } } - if err := container.Status(); err != nil { - errC <- err - return + err = pods.CheckFailedContainers(pod, []string{step.container}) + if follow { + err = podRef.CheckFailedContainers([]string{step.container}) } + if err == nil { + continue + } + + errC <- err + return } } @@ -206,7 +212,7 @@ func (r *Reader) readPodLogs(podC <-chan string, podErrC <-chan error, follow, t errC <- fmt.Errorf("no steps found for task %s", r.task) continue } - r.readStepsLogs(logC, errC, steps, p, follow, timestamps) + r.readStepsLogs(logC, errC, steps, p, pod, follow, timestamps) } }() diff --git a/pkg/log/task_reader_test.go b/pkg/log/task_reader_test.go new file mode 100644 index 0000000000..e454159a45 --- /dev/null +++ b/pkg/log/task_reader_test.go @@ -0,0 +1,106 @@ +package log + +import ( + "testing" + + "github.com/tektoncd/cli/pkg/cli" + podsfake "github.com/tektoncd/cli/pkg/pods/fake" + "github.com/tektoncd/cli/pkg/test" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestReadPodLogs_stopsAfterFailedStep(t *testing.T) { + const ( + ns = "ns" + podName = "pod" + ) + + pods := []*corev1.Pod{{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + Namespace: ns, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + {Name: "step-first"}, + {Name: "step-second"}, + }, + }, + Status: corev1.PodStatus{ + ContainerStatuses: []corev1.ContainerStatus{ + { + Name: "step-first", + State: corev1.ContainerState{ + Terminated: &corev1.ContainerStateTerminated{ + ExitCode: 137, + Reason: "OOMKilled", + }, + }, + }, + { + Name: "step-second", + State: corev1.ContainerState{ + Terminated: &corev1.ContainerStateTerminated{ + ExitCode: 0, + }, + }, + }, + }, + }, + }} + + cs, _ := test.SeedV1beta1TestData(t, test.Data{Pods: pods}) + reader := &Reader{ + ns: ns, + clients: &cli.Clients{Kube: cs.Kube}, + streamer: podsfake.Streamer(podsfake.Logs( + podsfake.Task(podName, + podsfake.Step("step-first", "first-log"), + podsfake.Step("step-second", "second-log"), + ), + )), + task: "task", + } + + podC := make(chan string, 1) + podC <- podName + close(podC) + + logC, errC := reader.readPodLogs(podC, nil, false, false) + + var logs []Log + var errs []error + for logC != nil || errC != nil { + select { + case l, ok := <-logC: + if !ok { + logC = nil + continue + } + logs = append(logs, l) + case err, ok := <-errC: + if !ok { + errC = nil + continue + } + errs = append(errs, err) + } + } + + if len(logs) != 2 { + t.Fatalf("expected first step log and EOF only, got %#v", logs) + } + if logs[0].Step != "first" || logs[0].Log != "first-log" { + t.Fatalf("unexpected first log: %#v", logs[0]) + } + if logs[1].Step != "first" || logs[1].Log != "EOFLOG" { + t.Fatalf("unexpected EOF log: %#v", logs[1]) + } + if len(errs) != 1 { + t.Fatalf("expected one error, got %#v", errs) + } + if errs[0] == nil || errs[0].Error() == "" { + t.Fatalf("expected non-empty error, got %#v", errs[0]) + } +} diff --git a/pkg/pipelinerun/tracker.go b/pkg/pipelinerun/tracker.go index cac39174c6..7901a955d4 100644 --- a/pkg/pipelinerun/tracker.go +++ b/pkg/pipelinerun/tracker.go @@ -17,6 +17,7 @@ package pipelinerun import ( "context" "errors" + "fmt" "sync" "time" @@ -27,6 +28,7 @@ import ( "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" informers "github.com/tektoncd/pipeline/pkg/client/informers/externalversions" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/client-go/tools/cache" @@ -223,13 +225,21 @@ func GetTaskRunsWithStatus(pr *v1.PipelineRun, c *cli.Clients, ns string) (map[s return map[string]*v1.PipelineRunTaskRunStatus{}, nil } + taskRunsByName, err := getTaskRunsByPipelineRun(pr.Name, c, ns) + if err != nil && !canFallbackToTaskRunGet(err) { + return nil, err + } + trStatuses := make(map[string]*v1.PipelineRunTaskRunStatus) for _, cr := range pr.Status.ChildReferences { //TODO: Needs to handle Run, CustomRun later if cr.Kind == "TaskRun" { - tr, err := taskrunpkg.GetTaskRun(taskrunGroupResource, c, cr.Name, ns) - if err != nil { - return nil, err + tr, ok := taskRunsByName[cr.Name] + if !ok { + tr, err = taskrunpkg.GetTaskRun(taskrunGroupResource, c, cr.Name, ns) + if err != nil { + return nil, err + } } trStatuses[cr.Name] = &v1.PipelineRunTaskRunStatus{ @@ -242,3 +252,23 @@ func GetTaskRunsWithStatus(pr *v1.PipelineRun, c *cli.Clients, ns string) (map[s return trStatuses, nil } + +func getTaskRunsByPipelineRun(prName string, c *cli.Clients, ns string) (map[string]*v1.TaskRun, error) { + var taskRuns v1.TaskRunList + if err := actions.ListV1(taskrunGroupResource, c, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("tekton.dev/pipelineRun=%s", prName), + }, ns, &taskRuns); err != nil { + return nil, err + } + + taskRunsByName := make(map[string]*v1.TaskRun, len(taskRuns.Items)) + for i := range taskRuns.Items { + taskRunsByName[taskRuns.Items[i].Name] = &taskRuns.Items[i] + } + + return taskRunsByName, nil +} + +func canFallbackToTaskRunGet(err error) bool { + return apierrors.IsForbidden(err) || apierrors.IsUnauthorized(err) +} diff --git a/pkg/pipelinerun/tracker_test.go b/pkg/pipelinerun/tracker_test.go index 49d5822cd8..c5e3ca138a 100644 --- a/pkg/pipelinerun/tracker_test.go +++ b/pkg/pipelinerun/tracker_test.go @@ -33,6 +33,7 @@ import ( pipelinetest "github.com/tektoncd/pipeline/test" "github.com/tektoncd/pipeline/test/diff" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" @@ -266,3 +267,75 @@ func TestTracker_watchErrorHandler(t *testing.T) { }) } } + +func TestGetTaskRunsWithStatus_fallsBackWhenListForbidden(t *testing.T) { + const ( + ns = "namespace" + prName = "output-pipeline-1" + trName = "output-task-1" + task = "output-task-1" + pod = "output-task-1-pod" + ) + + pr := &v1.PipelineRun{ + ObjectMeta: metav1.ObjectMeta{ + Name: prName, + Namespace: ns, + }, + Status: v1.PipelineRunStatus{ + PipelineRunStatusFields: v1.PipelineRunStatusFields{ + ChildReferences: []v1.ChildStatusReference{{ + Name: trName, + PipelineTaskName: task, + TypeMeta: runtime.TypeMeta{ + APIVersion: "tekton.dev/v1", + Kind: "TaskRun", + }, + }}, + }, + }, + } + tr := &v1.TaskRun{ + ObjectMeta: metav1.ObjectMeta{ + Name: trName, + Namespace: ns, + }, + Status: v1.TaskRunStatus{ + TaskRunStatusFields: v1.TaskRunStatusFields{ + PodName: pod, + }, + }, + } + + cs, _ := test.SeedTestData(t, pipelinetest.Data{TaskRuns: []*v1.TaskRun{tr}}) + cs.Pipeline.Resources = cb.APIResourceList("v1", []string{"taskrun", "pipelinerun"}) + + tdc := testDynamic.Options{ + PrependReactors: []testDynamic.PrependOpt{{ + Verb: "list", + Resource: "taskruns", + Action: func(_ k8stest.Action) (bool, runtime.Object, error) { + return true, nil, apierrors.NewForbidden(taskrunGroupResource.GroupResource(), "", errors.New("forbidden")) + }, + }}, + } + dynamic, err := tdc.Client(cb.UnstructuredTR(tr, "v1")) + if err != nil { + t.Fatalf("unable to create dynamic client: %v", err) + } + + clients := &cli.Clients{ + Tekton: cs.Pipeline, + Kube: cs.Kube, + Dynamic: dynamic, + } + + trStatuses, err := GetTaskRunsWithStatus(pr, clients, ns) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if trStatuses[trName] == nil || trStatuses[trName].Status == nil || trStatuses[trName].Status.PodName != pod { + t.Fatalf("unexpected taskrun statuses: %#v", trStatuses) + } +} diff --git a/pkg/pods/container.go b/pkg/pods/container.go index 6d41abd304..91b4568420 100644 --- a/pkg/pods/container.go +++ b/pkg/pods/container.go @@ -30,43 +30,7 @@ type Container struct { } func (c *Container) Status() error { - pod, err := c.pod.Get() - if err != nil { - return err - } - - container := c.name - for _, cs := range pod.Status.ContainerStatuses { - if cs.Name != container { - continue - } - - if cs.State.Terminated != nil && cs.State.Terminated.ExitCode == 1 { - msg := "" - - if cs.State.Terminated.Reason != "" && cs.State.Terminated.Reason != "Error" { - msg = msg + " : " + cs.State.Terminated.Reason - } - - if cs.State.Terminated.Message != "" && cs.State.Terminated.Message != "Error" { - msg = msg + " : " + cs.State.Terminated.Message - } - - return fmt.Errorf("container %s has failed %s", container, msg) - } - } - - for _, cs := range pod.Status.InitContainerStatuses { - if cs.Name != container { - continue - } - - if cs.State.Terminated != nil && cs.State.Terminated.ExitCode == 1 { - return fmt.Errorf("container %s has failed: %s", container, cs.State.Terminated.Reason) - } - } - - return nil + return c.pod.CheckFailedContainers([]string{c.name}) } // Log represents one log message from a pod @@ -128,3 +92,51 @@ func (lr *LogReader) Read() (<-chan Log, <-chan error, error) { return logC, errC, nil } + +func (p *Pod) CheckFailedContainers(containerNames []string) error { + pod, err := p.Get() + if err != nil { + return err + } + + return CheckFailedContainers(pod, containerNames) +} + +func CheckFailedContainers(pod *corev1.Pod, containerNames []string) error { + containerSet := map[string]struct{}{} + for _, containerName := range containerNames { + containerSet[containerName] = struct{}{} + } + + for _, cs := range pod.Status.ContainerStatuses { + if _, ok := containerSet[cs.Name]; !ok { + continue + } + + if cs.State.Terminated != nil && cs.State.Terminated.ExitCode != 0 { + msg := "" + + if cs.State.Terminated.Reason != "" && cs.State.Terminated.Reason != "Error" { + msg += " : " + cs.State.Terminated.Reason + } + + if cs.State.Terminated.Message != "" && cs.State.Terminated.Message != "Error" { + msg += " : " + cs.State.Terminated.Message + } + + return fmt.Errorf("container %s has failed %s", cs.Name, msg) + } + } + + for _, cs := range pod.Status.InitContainerStatuses { + if _, ok := containerSet[cs.Name]; !ok { + continue + } + + if cs.State.Terminated != nil && cs.State.Terminated.ExitCode != 0 { + return fmt.Errorf("container %s has failed: %s", cs.Name, cs.State.Terminated.Reason) + } + } + + return nil +} diff --git a/pkg/pods/container_test.go b/pkg/pods/container_test.go index 39d68ddc0a..92405b5f44 100644 --- a/pkg/pods/container_test.go +++ b/pkg/pods/container_test.go @@ -15,6 +15,7 @@ package pods import ( + "strings" "testing" "github.com/tektoncd/cli/pkg/pods/fake" @@ -101,6 +102,56 @@ func TestContainer_fetch_logs(t *testing.T) { } } +func TestCheckFailedContainers(t *testing.T) { + pod := &corev1.Pod{ + Status: corev1.PodStatus{ + ContainerStatuses: []corev1.ContainerStatus{ + { + Name: "step-success", + State: corev1.ContainerState{ + Terminated: &corev1.ContainerStateTerminated{ExitCode: 0}, + }, + }, + { + Name: "step-failed", + State: corev1.ContainerState{ + Terminated: &corev1.ContainerStateTerminated{ + ExitCode: 137, + Reason: "CrashLoopBackOff", + Message: "boom", + }, + }, + }, + }, + InitContainerStatuses: []corev1.ContainerStatus{ + { + Name: "step-init-failed", + State: corev1.ContainerState{ + Terminated: &corev1.ContainerStateTerminated{ + ExitCode: 143, + Reason: "InitError", + }, + }, + }, + }, + }, + } + + if err := CheckFailedContainers(pod, []string{"step-success"}); err != nil { + t.Fatalf("unexpected error for successful container: %v", err) + } + + err := CheckFailedContainers(pod, []string{"step-failed"}) + if err == nil || !strings.Contains(err.Error(), "step-failed") { + t.Fatalf("expected failed container error, got %v", err) + } + + err = CheckFailedContainers(pod, []string{"step-init-failed"}) + if err == nil || !strings.Contains(err.Error(), "InitError") { + t.Fatalf("expected init container failure, got %v", err) + } +} + func containerLogs(lr *LogReader) ([]Log, error) { logC, errC, err := lr.Read() diff --git a/pkg/pods/pod.go b/pkg/pods/pod.go index b931184180..f314333786 100644 --- a/pkg/pods/pod.go +++ b/pkg/pods/pod.go @@ -19,14 +19,12 @@ import ( "errors" "fmt" "io" - "sync" - "time" "github.com/tektoncd/cli/pkg/pods/stream" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" - "k8s.io/client-go/informers" + "k8s.io/apimachinery/pkg/watch" k8s "k8s.io/client-go/kubernetes" typedv1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/cache" @@ -70,90 +68,48 @@ func NewWithDefaults(name, ns string, client k8s.Interface) *Pod { } } -// podResult holds the result of pod status check -type podResult struct { - pod *corev1.Pod - err error -} - // Wait wait for the pod to get up and running func (p *Pod) Wait() (*corev1.Pod, error) { - // ensure pod exists before we actually check for it - if _, err := p.Get(); err != nil { - return nil, err - } - - stopC := make(chan struct{}) - mu := sync.Mutex{} - - var result podResult + for { + pod, err := p.Get() + if err != nil { + return nil, err + } - // Start watcher in a goroutine - go func() { - p.watcher(stopC, &result, &mu) - }() + if readyPod, err := checkPodStatus(pod); readyPod != nil || err != nil { + return readyPod, err + } - // Wait for stopC - <-stopC - return result.pod, result.err -} + watcher, err := p.Kc.CoreV1().Pods(p.Ns).Watch(context.Background(), metav1.ListOptions{ + FieldSelector: fields.OneTermEqualSelector("metadata.name", p.Name).String(), + ResourceVersion: pod.ResourceVersion, + }) + if err != nil { + return nil, err + } -func (p *Pod) watcher(stopC chan struct{}, result *podResult, mu *sync.Mutex) { - factory := informers.NewSharedInformerFactoryWithOptions( - p.Kc, time.Second*10, - informers.WithNamespace(p.Ns), - informers.WithTweakListOptions(podOpts(p.Name))) - - updatePodStatus := func(obj interface{}) { - mu.Lock() - defer mu.Unlock() - - pod, err := checkPodStatus(obj) - if pod != nil || err != nil { - result.pod = pod - result.err = err - close(stopC) + retry := false + for event := range watcher.ResultChan() { + updatedPod, ok, err := podFromWatchEvent(event) + if err != nil { + watcher.Stop() + return nil, err + } + if !ok { + retry = true + break + } + if readyPod, err := checkPodStatus(updatedPod); readyPod != nil || err != nil { + watcher.Stop() + return readyPod, err + } } - } + watcher.Stop() - informer := factory.Core().V1().Pods().Informer() - // Set a custom watch error handler that ignores context.Canceled errors - // to prevent "Failed to watch" log messages when the informer is stopped intentionally - _ = informer.SetWatchErrorHandlerWithContext(watchErrorHandler) - - _, err := informer.AddEventHandler( - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - select { - case <-stopC: - return - default: - updatePodStatus(obj) - } - }, - UpdateFunc: func(_, newObj interface{}) { - select { - case <-stopC: - return - default: - updatePodStatus(newObj) - } - }, - DeleteFunc: func(obj interface{}) { - select { - case <-stopC: - return - default: - updatePodStatus(obj) - } - }, - }) - if err != nil { - return + if retry { + continue + } } - - factory.Start(stopC) - factory.WaitForCacheSync(stopC) } func podOpts(name string) func(opts *metav1.ListOptions) { @@ -199,6 +155,18 @@ func checkPodStatus(obj interface{}) (*corev1.Pod, error) { return nil, nil } +func podFromWatchEvent(event watch.Event) (*corev1.Pod, bool, error) { + if event.Type == watch.Error { + return nil, false, nil + } + + pod, ok := event.Object.(*corev1.Pod) + if !ok { + return nil, false, nil + } + return pod, true, nil +} + // Get gets the pod func (p *Pod) Get() (*corev1.Pod, error) { return p.Kc.CoreV1().Pods(p.Ns).Get(context.Background(), p.Name, metav1.GetOptions{}) diff --git a/pkg/pods/pod_test.go b/pkg/pods/pod_test.go index 02879d7a2a..eee300c292 100644 --- a/pkg/pods/pod_test.go +++ b/pkg/pods/pod_test.go @@ -100,6 +100,91 @@ func Test_wait_pod_success(t *testing.T) { } } +func Test_wait_pod_ready_without_watch(t *testing.T) { + podname := "test" + ns := "ns" + + clients, _ := test.SeedV1beta1TestData(t, test.Data{Pods: []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: podname, + Namespace: ns, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + }, + }}) + + pod := NewWithDefaults(podname, ns, clients.Kube) + p, err := pod.Wait() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if p == nil || p.Name != podname { + t.Fatalf("unexpected pod result: %#v", p) + } +} + +func Test_wait_pod_retries_when_watch_closes(t *testing.T) { + podname := "test" + ns := "ns" + + initial := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podname, + Namespace: ns, + ResourceVersion: "1", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodPending, + }, + } + running := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podname, + Namespace: ns, + ResourceVersion: "2", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + } + + clients, _ := test.SeedV1beta1TestData(t, test.Data{Pods: []*corev1.Pod{initial}}) + firstWatch := watch.NewFake() + secondWatch := watch.NewFake() + watchCalls := 0 + clients.Kube.PrependWatchReactor("pods", func(_ k8stest.Action) (bool, watch.Interface, error) { + watchCalls++ + if watchCalls == 1 { + go func() { + time.Sleep(50 * time.Millisecond) + firstWatch.Stop() + }() + return true, firstWatch, nil + } + + go func() { + time.Sleep(50 * time.Millisecond) + secondWatch.Modify(running) + }() + return true, secondWatch, nil + }) + + pod := NewWithDefaults(podname, ns, clients.Kube) + p, err := pod.Wait() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if p == nil || p.Status.Phase != corev1.PodRunning { + t.Fatalf("unexpected pod result: %#v", p) + } + if watchCalls < 2 { + t.Fatalf("expected Wait to retry watch, got %d watch calls", watchCalls) + } +} + func Test_wait_pod_fail(t *testing.T) { podname := "test" ns := "ns"