Skip to content

Commit

Permalink
Add CLI data features using shell command on MariaDB (#732)
Browse files Browse the repository at this point in the history
Signed-off-by: ashraful <[email protected]>
  • Loading branch information
AshrafulHaqueToni authored Nov 15, 2023
1 parent dfd5e3b commit 41dbfff
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 163 deletions.
6 changes: 0 additions & 6 deletions pkg/data/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,6 @@ limitations under the License.

package data

const (
caFile = "/tmp/ca.crt"
certFile = "/tmp/client.crt"
keyFile = "/tmp/client.key"
)

const (
actor = "kubedb-cli"
)
221 changes: 91 additions & 130 deletions pkg/data/mariadb.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,25 @@ package data
import (
"bytes"
"context"
"errors"
"fmt"
"log"
"os"
"os/exec"
"strconv"
"strings"

api "kubedb.dev/apimachinery/apis/kubedb/v1alpha2"
cs "kubedb.dev/apimachinery/client/clientset/versioned"
"kubedb.dev/cli/pkg/lib"

_ "github.com/go-sql-driver/mysql"
"github.com/spf13/cobra"
shell "gomodules.xyz/go-sh"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
"kmodules.xyz/client-go/tools/portforward"
)

func InsertMariaDBDataCMD(f cmdutil.Factory) *cobra.Command {
Expand All @@ -52,9 +51,9 @@ func InsertMariaDBDataCMD(f cmdutil.Factory) *cobra.Command {
Aliases: []string{
"md",
},
Short: "Connect to a mariadb object",
Long: `Use this cmd to exec into a mariadb object's primary pod.`,
Example: `kubectl dba insert mariadb -n demo sample-mariadb --rows 1000`,
Short: " Insert data to mariadb",
Long: `Use this cmd to insert data into a mariadb database.`,
Example: `kubectl dba data insert mariadb -n demo sample-mariadb --rows 1000`,
Run: func(cmd *cobra.Command, args []string) {
if len(args) == 0 {
log.Fatal("Enter mariadb object's name as an argument")
Expand All @@ -75,14 +74,11 @@ func InsertMariaDBDataCMD(f cmdutil.Factory) *cobra.Command {
log.Fatal("Inserted rows must be greater than 0")
}

tunnel, err := lib.TunnelToDBService(opts.config, dbName, namespace, api.MySQLDatabasePort)
if err != nil {
log.Fatal("couldn't create tunnel, error: ", err)
if rows > 100000 {
log.Fatal("Inserted rows must be less than or equal 100000")
}

defer tunnel.Close()

err = opts.insertDataExecCmd(tunnel, rows)
err = opts.insertDataExecCmd(rows)
if err != nil {
log.Fatal(err)
}
Expand All @@ -94,10 +90,11 @@ func InsertMariaDBDataCMD(f cmdutil.Factory) *cobra.Command {
return mdInsertCmd
}

func (opts *mariadbOpts) insertDataExecCmd(tunnel *portforward.Tunnel, rows int) error {
command := `
USE mysql;
CREATE TABLE IF NOT EXISTS kubedb_table (id VARCHAR(255) PRIMARY KEY);
func (opts *mariadbOpts) insertDataExecCmd(rows int) error {
command := fmt.Sprintf(`
CREATE DATABASE IF NOT EXISTS %v;
USE %v;
CREATE TABLE IF NOT EXISTS %v (id VARCHAR(255) PRIMARY KEY);
DROP PROCEDURE IF EXISTS insert_data;
DELIMITER //
CREATE PROCEDURE insert_data(max_value INT)
Expand All @@ -118,10 +115,10 @@ func (opts *mariadbOpts) insertDataExecCmd(tunnel *portforward.Tunnel, rows int)
END WHILE;
END //
DELIMITER ;
CALL insert_data(` + fmt.Sprintf("%v", rows) + `);
`
CALL insert_data(`+fmt.Sprintf("%v", rows)+`);
`, KubeDBDatabaseName, KubeDBDatabaseName, KubeDBTableName)

_, err := opts.executeCommand(tunnel.Local, command)
_, err := opts.executeCommand(command)
if err != nil {
return err
}
Expand All @@ -141,9 +138,9 @@ func VerifyMariaDBDataCMD(f cmdutil.Factory) *cobra.Command {
Aliases: []string{
"md",
},
Short: "Verify rows in a MariaDB database",
Short: "Verify rows to a mariadb resource",
Long: `Use this cmd to verify data in a mariadb object`,
Example: `kubectl dba verify mariadb -n demo sample-mariadb --rows 1000`,
Example: `kubectl dba data verify mariadb -n demo sample-mariadb --rows 1000`,
Run: func(cmd *cobra.Command, args []string) {
if len(args) == 0 {
log.Fatal("Enter mariadb object's name as an argument.")
Expand All @@ -160,13 +157,7 @@ func VerifyMariaDBDataCMD(f cmdutil.Factory) *cobra.Command {
log.Fatalln(err)
}

tunnel, err := lib.TunnelToDBService(opts.config, dbName, namespace, api.MySQLDatabasePort)
if err != nil {
log.Fatal("couldn't create tunnel, error: ", err)
}
defer tunnel.Close()

err = opts.verifyDataExecCmd(tunnel, rows)
err = opts.verifyDataExecCmd(rows)
if err != nil {
log.Fatal(err)
}
Expand All @@ -178,23 +169,27 @@ func VerifyMariaDBDataCMD(f cmdutil.Factory) *cobra.Command {
return mdVerifyCmd
}

func (opts *mariadbOpts) verifyDataExecCmd(tunnel *portforward.Tunnel, rows int) error {
func (opts *mariadbOpts) verifyDataExecCmd(rows int) error {
if rows <= 0 {
return fmt.Errorf("rows need to be greater than 0")
}

command := `
USE mysql;
SELECT COUNT(*) FROM kubedb_table;
`
out, err := opts.executeCommand(tunnel.Local, command)
command := fmt.Sprintf(`
CREATE DATABASE IF NOT EXISTS %v;
USE %v;
CREATE TABLE IF NOT EXISTS kubedb_table (id VARCHAR(255) PRIMARY KEY);
SELECT COUNT(*) FROM kubedb_table;
`, KubeDBDatabaseName, KubeDBDatabaseName)

o, err := opts.executeCommand(command)
if err != nil {
return err
}

out := string(o)
output := strings.Split(out, "\n")

totalKeys, err := strconv.Atoi(strings.TrimPrefix(output[1], " "))
totalKeys, err := strconv.Atoi(strings.TrimSpace(output[1]))
if err != nil {
return err
}
Expand All @@ -214,9 +209,9 @@ func DropMariaDBDataCMD(f cmdutil.Factory) *cobra.Command {
Aliases: []string{
"md",
},
Short: "Verify rows in a MariaDB database",
Long: `Use this cmd to verify data in a mariadb object`,
Example: `kubectl dba drop mariadb -n demo sample-mariadb`,
Short: "Drop data from MariaDB",
Long: `Use this cmd to drop data from a mongodb`,
Example: `kubectl dba data drop mariadb -n demo sample-mariadb`,
Run: func(cmd *cobra.Command, args []string) {
if len(args) == 0 {
log.Fatal("Enter mariadb object's name as an argument.")
Expand All @@ -233,13 +228,7 @@ func DropMariaDBDataCMD(f cmdutil.Factory) *cobra.Command {
log.Fatalln(err)
}

tunnel, err := lib.TunnelToDBService(opts.config, dbName, namespace, api.MySQLDatabasePort)
if err != nil {
log.Fatal("couldn't create tunnel, error: ", err)
}
defer tunnel.Close()

err = opts.dropDataExecCmd(tunnel)
err = opts.dropDataExecCmd()
if err != nil {
log.Fatal(err)
}
Expand All @@ -249,12 +238,12 @@ func DropMariaDBDataCMD(f cmdutil.Factory) *cobra.Command {
return mdDropCmd
}

func (opts *mariadbOpts) dropDataExecCmd(tunnel *portforward.Tunnel) error {
command := `
USE mysql;
DROP TABLE IF EXISTS kubedb_table;
`
_, err := opts.executeCommand(tunnel.Local, command)
func (opts *mariadbOpts) dropDataExecCmd() error {
command := fmt.Sprintf(`
USE %v;
DROP TABLE IF EXISTS %v;
`, KubeDBDatabaseName, KubeDBTableName)
_, err := opts.executeCommand(command)
if err != nil {
return err
}
Expand Down Expand Up @@ -319,101 +308,73 @@ func newMariaDBOpts(f cmdutil.Factory, dbName, namespace string) (*mariadbOpts,
}, nil
}

func (opts *mariadbOpts) getDockerShellCommand(localPort int, dockerFlags, mariadbExtraFlags []interface{}) (*shell.Session, error) {
sh := shell.NewSession()
sh.ShowCMD = false

func (opts *mariadbOpts) getShellCommand(command string) (string, error) {
db := opts.db
dockerCommand := []interface{}{
"run", "--network=host",
"-e", fmt.Sprintf("MYSQL_PWD=%s", opts.pass),
cmd := ""
user, password, err := opts.GetMariaDBAuthCredentials(db)
if err != nil {
return "", err
}
dockerCommand = append(dockerCommand, dockerFlags...)
containerName := "mariadb"
label := opts.db.OffshootLabels()

mariadbCommand := []interface{}{
"mysql",
"--host=127.0.0.1", fmt.Sprintf("--port=%d", localPort),
fmt.Sprintf("--user=%s", opts.username),
if *opts.db.Spec.Replicas > 1 {
label["kubedb.com/role"] = "primary"
}

pods, err := opts.client.CoreV1().Pods(db.Namespace).List(context.TODO(), metav1.ListOptions{
LabelSelector: labels.Set.String(label),
})
if err != nil || len(pods.Items) == 0 {
return "", err
}
if db.Spec.TLS != nil {
secretName := db.CertificateName(api.MariaDBClientCert)
certSecret, err := opts.client.CoreV1().Secrets(db.Namespace).Get(context.TODO(), secretName, metav1.GetOptions{})
if err != nil {
return nil, err
}

caCrt, ok := certSecret.Data[corev1.ServiceAccountRootCAKey]
if !ok {
return nil, fmt.Errorf("missing %s in secret %s/%s", corev1.ServiceAccountRootCAKey, certSecret.Namespace, certSecret.Name)
}
err = os.WriteFile(caFile, caCrt, 0o644)
if err != nil {
return nil, err
}

crt, ok := certSecret.Data[corev1.TLSCertKey]
if !ok {
return nil, fmt.Errorf("missing %s in secret %s/%s", corev1.TLSCertKey, certSecret.Namespace, certSecret.Name)
}
err = os.WriteFile(certFile, crt, 0o644)
if err != nil {
return nil, err
}

key, ok := certSecret.Data[corev1.TLSPrivateKeyKey]
if !ok {
return nil, fmt.Errorf("missing %s in secret %s/%s", corev1.TLSPrivateKeyKey, certSecret.Namespace, certSecret.Name)
}
err = os.WriteFile(keyFile, key, 0o644)
if err != nil {
return nil, err
}

dockerCommand = append(dockerCommand,
"-v", fmt.Sprintf("%s:%s", caFile, caFile),
"-v", fmt.Sprintf("%s:%s", certFile, certFile),
"-v", fmt.Sprintf("%s:%s", keyFile, keyFile),
)
mariadbCommand = append(mariadbCommand,
fmt.Sprintf("--ssl-ca=%v", caFile),
fmt.Sprintf("--ssl-cert=%v", certFile),
fmt.Sprintf("--ssl-key=%v", keyFile),
)
cmd = fmt.Sprintf("kubectl exec -n %s %s -c %s -- mysql -u%s -p'%s' --host=%s --port=%s --ssl-ca='%v' --ssl-cert='%v' --ssl-key='%v' %s -e \"%s\"", db.Namespace, pods.Items[0].Name, containerName, user, password, "127.0.0.1", "3306", myCaFile, myCertFile, myKeyFile, api.ResourceSingularMySQL, command)
} else {
cmd = fmt.Sprintf("kubectl exec -n %s %s -c %s -- mysql -u%s -p'%s' %s -e \"%s\"", db.Namespace, pods.Items[0].Name, containerName, user, password, api.ResourceSingularMySQL, command)
}

dockerCommand = append(dockerCommand, opts.dbImage)
finalCommand := append(dockerCommand, mariadbCommand...)
if mariadbExtraFlags != nil {
finalCommand = append(finalCommand, mariadbExtraFlags...)
}
return sh.Command("docker", finalCommand...).SetStdin(os.Stdin), nil
return cmd, err
}

func (opts *mariadbOpts) executeCommand(localPort int, command string) (string, error) {
mariadbExtraFlags := []interface{}{
"-e", command,
func (opts *mariadbOpts) GetMariaDBAuthCredentials(db *api.MariaDB) (string, string, error) {
if db.Spec.AuthSecret == nil {
return "", "", errors.New("no database secret")
}

shSession, err := opts.getDockerShellCommand(localPort, nil, mariadbExtraFlags)
secret, err := opts.client.CoreV1().Secrets(db.Namespace).Get(context.TODO(), db.Spec.AuthSecret.Name, metav1.GetOptions{})
if err != nil {
return "", err
return "", "", err
}
return string(secret.Data[corev1.BasicAuthUsernameKey]), string(secret.Data[corev1.BasicAuthPasswordKey]), nil
}

out, err := shSession.Output()
func (opts *mariadbOpts) executeCommand(command string) ([]byte, error) {
cmd, err := opts.getShellCommand(command)
if err != nil {
return "", fmt.Errorf("failed to execute file, error: %s, output: %s\n", err, out)
return nil, err
}

output := ""
if string(out) != "" {
output = ", output:\n\n" + string(out)
output, err := opts.runCMD(cmd)
if err != nil {
return nil, err
}
return output, nil
}

errOutput := opts.errWriter.String()
if errOutput != "" {
return "", fmt.Errorf("failed to execute command, stderr: %s%s", errOutput, output)
func (opts *mariadbOpts) runCMD(cmd string) ([]byte, error) {
sh := exec.Command("/bin/sh", "-c", cmd)
stdout := bytes.NewBuffer(nil)
stderr := bytes.NewBuffer(nil)
sh.Stdout = stdout
sh.Stderr = stderr
err := sh.Run()
out := stdout.Bytes()
errOut := stderr.Bytes()
errOutput := string(errOut)
if errOutput != "" && !strings.Contains(errOutput, "NOTICE") && !strings.Contains(errOutput, "Warning") {
return nil, fmt.Errorf("failed to execute command, stderr: %s", errOutput)
}

return string(out), nil
if err != nil {
return nil, err
}
return out, nil
}
Loading

0 comments on commit 41dbfff

Please sign in to comment.