diff --git a/README.md b/README.md index 71d193a22..13f6a7cd9 100644 --- a/README.md +++ b/README.md @@ -492,6 +492,14 @@ By default, Dapr will use the `POST` verb. If your app uses Dapr for gRPC, you s dapr invoke --app-id nodeapp --method mymethod --verb GET ``` +Invoke your app in Kubernetes mode: + +If your app running in a Kubernetes cluster, use the `invoke` command with `--kubernetes` flag or the `-k` shorthand. + +``` +$ dapr invoke --kubernetes --app-id nodeapp --method mymethod +``` + ### List To list all Dapr instances running on your machine: diff --git a/cmd/invoke.go b/cmd/invoke.go index c8b009f59..968d83e33 100644 --- a/cmd/invoke.go +++ b/cmd/invoke.go @@ -21,6 +21,7 @@ import ( "github.com/spf13/cobra" + "github.com/dapr/cli/pkg/kubernetes" "github.com/dapr/cli/pkg/print" "github.com/dapr/cli/pkg/standalone" ) @@ -38,15 +39,18 @@ var ( var InvokeCmd = &cobra.Command{ Use: "invoke", - Short: "Invoke a method on a given Dapr application. Supported platforms: Self-hosted", + Short: "Invoke a method on a given Dapr application. Supported platforms: Kubernetes and self-hosted", Example: ` -# Invoke a sample method on target app with POST Verb -dapr invoke --app-id target --method sample --data '{"key":"value"} +# Invoke a sample method on target app with POST Verb in self-hosted mode +dapr invoke --app-id target --method sample --data '{"key":"value"}' -# Invoke a sample method on target app with GET Verb +# Invoke a sample method on target app with in Kubernetes +dapr invoke -k --app-id target --method sample --data '{"key":"value"}' + +# Invoke a sample method on target app with GET Verb in self-hosted mode dapr invoke --app-id target --method sample --verb GET -# Invoke a sample method on target app with GET Verb using Unix domain socket +# Invoke a sample method on target app with GET Verb using Unix domain socket in self-hosted mode dapr invoke --unix-domain-socket --app-id target --method sample --verb GET `, Run: func(cmd *cobra.Command, args []string) { @@ -66,7 +70,6 @@ dapr invoke --unix-domain-socket --app-id target --method sample --verb GET } else if invokeData != "" { bytePayload = []byte(invokeData) } - client := standalone.NewClient() // TODO(@daixiang0): add Windows support. if invokeSocket != "" { @@ -78,7 +81,14 @@ dapr invoke --unix-domain-socket --app-id target --method sample --verb GET } } - response, err := client.Invoke(invokeAppID, invokeAppMethod, bytePayload, invokeVerb, invokeSocket) + var response string + if kubernetesMode { + response, err = kubernetes.Invoke(invokeAppID, invokeAppMethod, bytePayload, invokeVerb) + } else { + client := standalone.NewClient() + response, err = client.Invoke(invokeAppID, invokeAppMethod, bytePayload, invokeVerb, invokeSocket) + } + if err != nil { err = fmt.Errorf("error invoking app %s: %w", invokeAppID, err) print.FailureStatusEvent(os.Stderr, err.Error()) @@ -93,6 +103,7 @@ dapr invoke --unix-domain-socket --app-id target --method sample --verb GET } func init() { + InvokeCmd.Flags().BoolVarP(&kubernetesMode, "kubernetes", "k", false, "Invoke a method on a Dapr application in a Kubernetes cluster") InvokeCmd.Flags().StringVarP(&invokeAppID, "app-id", "a", "", "The application id to invoke") InvokeCmd.Flags().StringVarP(&invokeAppMethod, "method", "m", "", "The method to invoke") InvokeCmd.Flags().StringVarP(&invokeData, "data", "d", "", "The JSON serialized data string (optional)") diff --git a/pkg/kubernetes/invoke.go b/pkg/kubernetes/invoke.go new file mode 100644 index 000000000..a2ffa212a --- /dev/null +++ b/pkg/kubernetes/invoke.go @@ -0,0 +1,175 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubernetes + +import ( + "context" + "fmt" + "net/url" + "strings" + + core_v1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/net" + k8s "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +type AppInfo struct { + AppID string `csv:"APP ID" json:"appId" yaml:"appId"` + HTTPPort string `csv:"HTTP PORT" json:"httpPort" yaml:"httpPort"` + GRPCPort string `csv:"GRPC PORT" json:"grpcPort" yaml:"grpcPort"` + AppPort string `csv:"APP PORT" json:"appPort" yaml:"appPort"` + PodName string `csv:"POD NAME" json:"podName" yaml:"podName"` + Namespace string `csv:"NAMESPACE" json:"namespace" yaml:"namespace"` +} + +type ( + DaprPod core_v1.Pod + DaprAppList []*AppInfo +) + +// Invoke is a command to invoke a remote or local dapr instance. +func Invoke(appID, method string, data []byte, verb string) (string, error) { + client, err := Client() + if err != nil { + return "", err + } + + app, err := GetAppInfo(client, appID) + if err != nil { + return "", err + } + + return invoke(client.CoreV1().RESTClient(), app, method, data, verb) +} + +func invoke(client rest.Interface, app *AppInfo, method string, data []byte, verb string) (string, error) { + res, err := app.Request(client.Verb(verb), method, data, verb) + if err != nil { + return "", fmt.Errorf("error get request: %w", err) + } + + result := res.Do(context.TODO()) + rawbody, err := result.Raw() + if err != nil { + return "", fmt.Errorf("error get raw: %w", err) + } + + if len(rawbody) > 0 { + return string(rawbody), nil + } + + return "", nil +} + +func GetAppInfo(client k8s.Interface, appID string) (*AppInfo, error) { + list, err := ListAppInfos(client, appID) + if err != nil { + return nil, err + } + if len(list) == 0 { + return nil, fmt.Errorf("%s not found", appID) + } + app := list[0] + return app, nil +} + +// List outputs plugins. +func ListAppInfos(client k8s.Interface, appIDs ...string) (DaprAppList, error) { + opts := v1.ListOptions{} + podList, err := client.CoreV1().Pods(v1.NamespaceAll).List(context.TODO(), opts) + if err != nil { + return nil, fmt.Errorf("err get pods list:%w", err) + } + + fn := func(*AppInfo) bool { + return true + } + if len(appIDs) > 0 { + fn = func(a *AppInfo) bool { + for _, id := range appIDs { + if id != "" && a.AppID == id { + return true + } + } + return false + } + } + + l := make(DaprAppList, 0) + for _, p := range podList.Items { + p := DaprPod(p) + for _, c := range p.Spec.Containers { + if c.Name == "daprd" { + app := getAppInfoFromPod(&p) + if fn(app) { + l = append(l, app) + } + } + } + } + + return l, nil +} + +func getAppInfoFromPod(p *DaprPod) *AppInfo { + var appInfo *AppInfo + for _, c := range p.Spec.Containers { + if c.Name == "daprd" { + appInfo = &AppInfo{ + PodName: p.Name, + Namespace: p.Namespace, + } + for i, arg := range c.Args { + switch arg { + case "--app-port": + appInfo.AppPort = c.Args[i+1] + case "--dapr-http-port": + appInfo.HTTPPort = c.Args[i+1] + case "--dapr-grpc-port": + appInfo.GRPCPort = c.Args[i+1] + case "--app-id": + appInfo.AppID = c.Args[i+1] + } + } + } + } + + return appInfo +} + +func (a *AppInfo) Request(r *rest.Request, method string, data []byte, verb string) (*rest.Request, error) { + r = r.Namespace(a.Namespace). + Resource("pods"). + SubResource("proxy"). + SetHeader("Content-Type", "application/json"). + Name(net.JoinSchemeNamePort("", a.PodName, a.AppPort)) + if data != nil { + r = r.Body(data) + } + + u, err := url.Parse(method) + if err != nil { + return nil, fmt.Errorf("error parse method %s: %w", method, err) + } + + r = r.Suffix(u.Path) + + for k, vs := range u.Query() { + r = r.Param(k, strings.Join(vs, ",")) + } + + return r, nil +} diff --git a/pkg/kubernetes/invoke_test.go b/pkg/kubernetes/invoke_test.go new file mode 100644 index 000000000..a39d9bd6d --- /dev/null +++ b/pkg/kubernetes/invoke_test.go @@ -0,0 +1,242 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubernetes + +import ( + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utiltesting "k8s.io/client-go/util/testing" +) + +func newDaprAppPod(name string, namespace string, appName string, creationTime time.Time, appPort string, httpPort string, grpcPort string) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + Annotations: map[string]string{}, + Labels: map[string]string{ + "app": appName, + }, + CreationTimestamp: metav1.Time{ + Time: creationTime, + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + {}, + { + Name: "daprd", + Args: []string{ + "--mode", + "kubernetes", + "--dapr-http-port", + httpPort, + "--dapr-grpc-port", + grpcPort, + "--dapr-internal-grpc-port", + "50002", + "--dapr-listen-addresses", + "[::1],127.0.0.1", + "--dapr-public-port", + "3501", + "--app-port", + appPort, + "--app-id", + appName, + "--control-plane-address", + "dapr-api.keel-system.svc.cluster.local:80", + "--app-protocol", + "http", + "--placement-host-address", + "dapr-placement-server.keel-system.svc.cluster.local:50005", + "--config", + "testAppID-Config", + "--log-level", + "info", + "--app-max-concurrency", + "-1", + "--sentry-address", + "dapr-sentry.keel-system.svc.cluster.local:80", + "--enable-metrics=true", + "--metrics-port", + "9090", + "--dapr-http-max-request-size", + "-1", + "--enable-mtls", + }, + }, + }, + }, + } +} + +func Test_getAppInfo(t *testing.T) { + client := fake.NewSimpleClientset(newDaprAppPod( + "testAppPod", "testAppNameSpace", + "testAppID", time.Now(), + "8080", "80801", "80802")) + + testCases := []struct { + name string + errorExpected bool + errString string + appID string + want *AppInfo + }{ + { + name: "get test Pod", + appID: "testAppID", + errorExpected: false, + errString: "", + want: &AppInfo{ + AppID: "testAppID", HTTPPort: "80801", GRPCPort: "80802", AppPort: "8080", PodName: "testAppPod", Namespace: "testAppNameSpace", + }, + }, + { + name: "get error Pod", + appID: "errorAppID", + errorExpected: true, + errString: "errorAppID not found", + want: nil, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + appInfo, err := GetAppInfo(client, tc.appID) + if tc.errorExpected { + assert.Error(t, err, "expected an error") + assert.Equal(t, tc.errString, err.Error(), "expected error strings to match") + } else { + assert.NoError(t, err, "expected no error") + assert.Equal(t, tc.want, appInfo, "expected appInfo to match") + } + }) + } +} + +func Test_invoke(t *testing.T) { + app := &AppInfo{ + AppID: "testAppID", AppPort: "8080", HTTPPort: "3500", GRPCPort: "50001", PodName: "testAppPod", Namespace: "testAppNameSpace", + } + + testCases := []struct { + name string + errorExpected bool + errString string + appID string + method string + verb string + data []byte + URLExpected string + }{ + { + name: "get request", + errorExpected: false, + errString: "", + method: "hello", + verb: "GET", + data: nil, + URLExpected: "https://localhost/api/v1/" + + "namespaces/testAppNameSpace/pods/testAppPod:8080/proxy/" + + "hello", + }, + { + name: "get request", + errorExpected: false, + errString: "", + method: "hello?abc=123&cdr=345#abb=aaa", + verb: "GET", + data: nil, + URLExpected: "https://localhost/api/v1/" + + "namespaces/testAppNameSpace/pods/testAppPod:8080/proxy/" + + "hello?abc=123&cdr=345#abb=aaa", + }, + { + name: "post request", + errorExpected: false, + errString: "", + method: "hello?abc=123&cdr=345#abb=aaa", + verb: "POST", + data: []byte("hello"), + URLExpected: "https://localhost/api/v1/" + + "namespaces/testAppNameSpace/pods/testAppPod:8080/proxy/" + + "hello?abc=123&cdr=345#abb=aaa", + }, + { + name: "post request", + errorExpected: false, + errString: "errorAppID not found", + method: "hello", + verb: "POST", + data: []byte("hello"), + URLExpected: "https://localhost/api/v1/" + + "namespaces/testAppNameSpace/pods/testAppPod:8080/proxy/" + + "hello", + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + testServer, fakeHandler := testServerEnv(t, 200) + defer testServer.Close() + client, err := restClient(testServer) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + _, err = invoke(client, app, tc.method, tc.data, tc.verb) + if tc.errorExpected { + assert.Error(t, err, "expected an error") + assert.Equal(t, tc.errString, err.Error(), "expected error strings to match") + } else { + assert.NoError(t, err, "expected no error") + data := string(tc.data) + fakeHandler.ValidateRequest(t, tc.URLExpected, tc.verb, &data) + } + }) + } +} + +func testServerEnv(t *testing.T, statusCode int) (*httptest.Server, *utiltesting.FakeHandler) { + t.Helper() + fakeHandler := utiltesting.FakeHandler{ + StatusCode: statusCode, + ResponseBody: "", + T: t, + } + testServer := httptest.NewServer(&fakeHandler) + return testServer, &fakeHandler +} + +func restClient(testServer *httptest.Server) (*rest.RESTClient, error) { + c, err := rest.RESTClientFor(&rest.Config{ + Host: testServer.URL, + ContentConfig: rest.ContentConfig{ + GroupVersion: &v1.SchemeGroupVersion, + NegotiatedSerializer: scheme.Codecs.WithoutConversion(), + }, + APIPath: "api", + Username: "user", + Password: "pass", + }) + return c, err +} diff --git a/tests/e2e/kubernetes/invoke_test.go b/tests/e2e/kubernetes/invoke_test.go new file mode 100644 index 000000000..b18ab47c5 --- /dev/null +++ b/tests/e2e/kubernetes/invoke_test.go @@ -0,0 +1,310 @@ +//go:build e2e +// +build e2e + +/* +Copyright 2024 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubernetes_test + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + appsv1 "k8s.io/api/apps/v1" + core_v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + k8s "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" + + "github.com/dapr/cli/tests/e2e/common" + "github.com/dapr/cli/tests/e2e/spawn" +) + +const ( + // invokeAppID is the Dapr app id used by the test app. + invokeAppID = "invoke-e2e-app" + // invokeAppNamespace keeps the test app isolated from the Dapr control plane namespace. + invokeAppNamespace = "dapr-cli-invoke-test" + // invokeAppImage is a minimal HTTP server that returns a known body on GET /. + // nginx is used because it is pulled by KinD base images and does not require Dapr SDK wiring: + // the cli's `invoke -k` path targets the pod's appPort directly, not daprd, so any HTTP server works. + invokeAppImage = "nginx:1.27-alpine" + // invokeAppPort is the container port nginx listens on and the value we annotate as dapr.io/app-port. + invokeAppPort = 80 +) + +// TestKubernetesInvoke verifies `dapr invoke -k` against a daprized app running in a KinD cluster. +// +// Flow: +// 1. Ensure a clean environment and install Dapr in the default test namespace. +// 2. Deploy a minimal HTTP server pod annotated for Dapr into a dedicated namespace. +// 3. Wait for the pod + daprd sidecar to be Ready so `dapr list -k` can discover it. +// 4. Run `dapr invoke -k --app-id ... --method ... --verb GET` and assert the response +// contains the expected payload served by the app. This exercises the codepath in +// pkg/kubernetes/invoke.go which proxies through the API server to appPort. +// 5. Tear the app and Dapr down regardless of outcome. +func TestKubernetesInvoke(t *testing.T) { + if common.ShouldSkipTest(common.DaprModeNonHA) { + t.Skipf("Skipping %s mode test", common.DaprModeNonHA) + } + + ensureCleanEnv(t, false) + + // Install Dapr first – invoke -k requires the operator / sidecar injector to be present. + installTests := common.GetTestsOnInstall(currentVersionDetails, common.TestOptions{ + HAEnabled: false, + MTLSEnabled: false, + ApplyComponentChanges: true, + CheckResourceExists: map[common.Resource]bool{ + common.CustomResourceDefs: true, + common.ClusterRoles: true, + common.ClusterRoleBindings: true, + }, + }) + uninstallTests := common.GetTestsOnUninstall(currentVersionDetails, common.TestOptions{ + CheckResourceExists: map[common.Resource]bool{ + common.CustomResourceDefs: true, + common.ClusterRoles: false, + common.ClusterRoleBindings: false, + }, + }) + + // Make sure the test app and Dapr are always torn down, even on early return. + // `t.Run` is allowed from a deferred function because defers execute before the + // test function returns, i.e. before the testing runtime marks the test as done. + // It is NOT allowed from t.Cleanup, which is why this lives here instead. + defer func() { + if err := deleteInvokeTestApp(); err != nil { + t.Logf("failed to delete test app: %v", err) + } + for _, tc := range uninstallTests { + t.Run(tc.Name, tc.Callable) + } + }() + + for _, tc := range installTests { + t.Run(tc.Name, tc.Callable) + if t.Failed() { + return + } + } + + t.Run("deploy test app", func(t *testing.T) { + require.NoError(t, deployInvokeTestApp(t)) + }) + if t.Failed() { + return + } + + t.Run("invoke via kubernetes flag", func(t *testing.T) { + daprPath := common.GetDaprPath() + // Retry invoke because the daprd sidecar may take a few seconds after the app + // pod reports Ready before the dapr operator/list path can see it. + var ( + output string + err error + ) + deadline := time.Now().Add(90 * time.Second) + for time.Now().Before(deadline) { + output, err = spawn.Command(daprPath, + "invoke", + "--log-as-json", + "-k", + "--app-id", invokeAppID, + "--method", "/", + "--verb", "GET", + ) + if err == nil && strings.Contains(output, "nginx") { + break + } + time.Sleep(3 * time.Second) + } + require.NoError(t, err, "dapr invoke -k failed. output:\n%s", output) + // nginx welcome page contains the string "Welcome to nginx!" – match loosely so + // a future version bump does not break the assertion. + assert.Contains(t, output, "nginx", "unexpected invoke output:\n%s", output) + }) + + t.Run("invoke unknown app returns error", func(t *testing.T) { + daprPath := common.GetDaprPath() + output, err := spawn.Command(daprPath, + "invoke", + "--log-as-json", + "-k", + "--app-id", "this-app-does-not-exist", + "--method", "/", + "--verb", "GET", + ) + require.Error(t, err, "invoke against a missing app should fail. output:\n%s", output) + assert.Contains(t, output, "not found") + }) +} + +// deployInvokeTestApp creates a dedicated namespace and a daprized nginx deployment whose pod +// is annotated with the appID + appPort the invoke test expects. It blocks until the pod is Ready. +func deployInvokeTestApp(t *testing.T) error { + client, err := newKubeClient() + if err != nil { + return fmt.Errorf("build kube client: %w", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + defer cancel() + + // 1. Namespace – create if missing, reuse otherwise so repeated runs on a dirty cluster still work. + ns := &core_v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: invokeAppNamespace}, + } + if _, err := client.CoreV1().Namespaces().Create(ctx, ns, metav1.CreateOptions{}); err != nil { + if !errors.IsAlreadyExists(err) { + return fmt.Errorf("create namespace: %w", err) + } + } + + // 2. Deployment – a single-replica nginx with Dapr annotations so the injector adds daprd. + replicas := int32(1) + deploy := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: invokeAppID, + Namespace: invokeAppNamespace, + Labels: map[string]string{"app": invokeAppID}, + }, + Spec: appsv1.DeploymentSpec{ + Replicas: &replicas, + Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"app": invokeAppID}}, + Template: core_v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"app": invokeAppID}, + Annotations: map[string]string{ + "dapr.io/enabled": "true", + "dapr.io/app-id": invokeAppID, + "dapr.io/app-port": fmt.Sprintf("%d", invokeAppPort), + }, + }, + Spec: core_v1.PodSpec{ + Containers: []core_v1.Container{{ + Name: "app", + Image: invokeAppImage, + Ports: []core_v1.ContainerPort{{ + ContainerPort: invokeAppPort, + Protocol: core_v1.ProtocolTCP, + }}, + ReadinessProbe: &core_v1.Probe{ + ProbeHandler: core_v1.ProbeHandler{ + TCPSocket: &core_v1.TCPSocketAction{ + Port: intstr.FromInt(invokeAppPort), + }, + }, + InitialDelaySeconds: 1, + PeriodSeconds: 2, + }, + }}, + }, + }, + }, + } + if _, err := client.AppsV1().Deployments(invokeAppNamespace).Create(ctx, deploy, metav1.CreateOptions{}); err != nil { + if !errors.IsAlreadyExists(err) { + return fmt.Errorf("create deployment: %w", err) + } + } + + // 3. Wait for at least one pod to be Ready and carry the daprd sidecar so `dapr invoke -k` can find it. + return waitForInvokeAppReady(ctx, t, client) +} + +func waitForInvokeAppReady(ctx context.Context, t *testing.T, client k8s.Interface) error { + ticker := time.NewTicker(3 * time.Second) + defer ticker.Stop() + for { + pods, err := client.CoreV1().Pods(invokeAppNamespace).List(ctx, metav1.ListOptions{ + LabelSelector: "app=" + invokeAppID, + }) + if err == nil { + for _, pod := range pods.Items { + if pod.Status.Phase != core_v1.PodRunning { + continue + } + if !allContainersReady(pod) { + continue + } + if !hasDaprdSidecar(pod) { + continue + } + t.Logf("test app pod ready: %s (%d containers)", pod.Name, len(pod.Status.ContainerStatuses)) + return nil + } + } + select { + case <-ctx.Done(): + return fmt.Errorf("timed out waiting for test app pod to become ready: %w", ctx.Err()) + case <-ticker.C: + } + } +} + +func allContainersReady(pod core_v1.Pod) bool { + if len(pod.Status.ContainerStatuses) == 0 { + return false + } + for _, cs := range pod.Status.ContainerStatuses { + if !cs.Ready { + return false + } + } + return true +} + +func hasDaprdSidecar(pod core_v1.Pod) bool { + for _, c := range pod.Spec.Containers { + if c.Name == "daprd" { + return true + } + } + return false +} + +// deleteInvokeTestApp tears down the namespace created for the test app. Deletion is synchronous +// enough for the subsequent Dapr uninstall to succeed; we do not wait for it to complete. +func deleteInvokeTestApp() error { + client, err := newKubeClient() + if err != nil { + return err + } + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + if err := client.CoreV1().Namespaces().Delete(ctx, invokeAppNamespace, metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) { + return err + } + return nil +} + +// newKubeClient builds a Kubernetes client from the test environment's kubeconfig. It mirrors +// the behaviour of common.getClient() but is kept local because that helper is unexported. +func newKubeClient() (*k8s.Clientset, error) { + loadingRules := clientcmd.NewDefaultClientConfigLoadingRules() + config, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig( + loadingRules, &clientcmd.ConfigOverrides{}, + ).ClientConfig() + if err != nil { + return nil, err + } + return k8s.NewForConfig(config) +}