diff --git a/.gitignore b/.gitignore index 1521c8b7..b992d0c5 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ dist +.tmp diff --git a/agent.go b/agent.go index 3d00aa40..cec2892b 100644 --- a/agent.go +++ b/agent.go @@ -27,6 +27,13 @@ type ( EdgeKeySet bool } + // EdgeStackConfig represnts an Edge stack config + EdgeStackConfig struct { + Name string + FileContent string + Prune bool + } + // AgentMetadata is the representation of the metadata object used to decorate // all the objects in the response of a Docker aggregated resource request. Metadata struct { @@ -71,6 +78,18 @@ type ( Credentials string } + InfoTags struct { + AgentPort string + EdgeKeySet bool + EngineStatus EngineStatus + Leader bool + NodeName string + NodeRole NodeRole + } + + EngineStatus int + NodeRole int + // OptionParser is used to parse options. OptionParser interface { Options() (*Options, error) @@ -81,11 +100,11 @@ type ( Create(advertiseAddr string, joinAddr []string) error Members() []ClusterMember Leave() - GetMemberByRole(role string) *ClusterMember + GetMemberByRole(role NodeRole) *ClusterMember GetMemberByNodeName(nodeName string) *ClusterMember GetMemberWithEdgeKeySet() *ClusterMember - GetTags() map[string]string - UpdateTags(tags map[string]string) error + GetTags() *InfoTags + UpdateTags(tags *InfoTags) error } // DigitalSignatureService is used to validate digital signatures. @@ -95,7 +114,7 @@ type ( // InfoService is used to retrieve information from a Docker environment. InfoService interface { - GetInformationFromDockerEngine() (map[string]string, error) + GetInformationFromDockerEngine() (*InfoTags, error) GetContainerIpFromDockerEngine(containerName string, ignoreNonSwarmNetworks bool) (string, error) GetServiceNameFromDockerEngine(containerName string) (string, error) } @@ -119,26 +138,23 @@ type ( IsTunnelOpen() bool } - // TunnelOperator is a service that is used to communicate with a Portainer instance and to manage - // the reverse tunnel. - TunnelOperator interface { - Start() error - IsKeySet() bool - SetKey(key string) error - GetKey() string - CloseTunnel() error - ResetActivityTimer() - } - // Scheduler is used to manage schedules Scheduler interface { Schedule(schedules []Schedule) error } + + // DockerStackService is a service used to deploy and remove Docker stacks + DockerStackService interface { + Login() error + Logout() error + Deploy(name, stackFileContent string, prune bool) error + Remove(name string) error + } ) const ( // Version represents the version of the agent. - Version = "1.5.1" + Version = "1.6.0" // APIVersion represents the version of the agent's API. APIVersion = "2" // DefaultAgentAddr is the default address used by the Agent API server. @@ -157,6 +173,8 @@ const ( DefaultEdgePollInterval = "5s" // DefaultEdgeSleepInterval is the default interval after which the agent will close the tunnel if no activity. DefaultEdgeSleepInterval = "5m" + // DefaultConfigCheckInterval is the default interval used to check if node config changed + DefaultConfigCheckInterval = "5s" // SupportedDockerAPIVersion is the minimum Docker API version supported by the agent. SupportedDockerAPIVersion = "1.24" // HTTPTargetHeaderName is the name of the header used to specify a target node. @@ -185,24 +203,6 @@ const ( // ResponseMetadataKey is the JSON field used to store any Portainer related information in // response objects. ResponseMetadataKey = "Portainer" - // MemberTagKeyAgentPort is the name of the label storing information about the port exposed - // by the agent. - MemberTagKeyAgentPort = "AgentPort" - // MemberTagKeyNodeName is the name of the label storing information about the name of the - // node where the agent is running. - MemberTagKeyNodeName = "NodeName" - // MemberTagKeyNodeRole is the name of the label storing information about the role of the - // node where the agent is running. - MemberTagKeyNodeRole = "NodeRole" - // MemberTagEngineStatus is the name of the label storing information about the status of the Docker engine where - // the agent is running. Possible values are "standalone" or "swarm". - MemberTagEngineStatus = "EngineStatus" - // MemberTagEdgeKeySet is the name of the label storing information regarding the association of an Edge key. - MemberTagEdgeKeySet = "EdgeKeySet" - // NodeRoleManager represents a manager node. - NodeRoleManager = "manager" - // NodeRoleWorker represents a worker node. - NodeRoleWorker = "worker" // TLSCertPath is the default path to the TLS certificate file. TLSCertPath = "cert.pem" // TLSKeyPath is the default path to the TLS key file. @@ -213,4 +213,22 @@ const ( DataDirectory = "/data" // EdgeKeyFile is the name of the file used to persist the Edge key associated to the agent. EdgeKeyFile = "agent_edge_key" + // DockerBinaryPath is the path of the docker binary + DockerBinaryPath = "/app" + // EdgeStackFilesPath is the path where edge stack files are saved + EdgeStackFilesPath = "/tmp/edge_stacks" + // EdgeStackQueueSleepInterval is the interval used to check if there's an Edge stack to deploy + EdgeStackQueueSleepInterval = "5s" +) + +const ( + _ NodeRole = iota + NodeRoleManager + NodeRoleWorker +) + +const ( + _ EngineStatus = iota + EngineStatusStandalone + EngineStatusSwarm ) diff --git a/build/linux/Dockerfile b/build/linux/Dockerfile index 60d0221b..3a2dcdd0 100644 --- a/build/linux/Dockerfile +++ b/build/linux/Dockerfile @@ -2,7 +2,7 @@ FROM portainer/base WORKDIR /app -COPY dist/agent /app/ +COPY dist /app/ COPY static /app/static ENTRYPOINT ["./agent"] diff --git a/build/linux/alpine.Dockerfile b/build/linux/alpine.Dockerfile new file mode 100644 index 00000000..ca3eda1b --- /dev/null +++ b/build/linux/alpine.Dockerfile @@ -0,0 +1,8 @@ +FROM alpine:latest + +WORKDIR /app + +COPY dist /app/ +COPY static /app/static + +ENTRYPOINT ["./agent"] diff --git a/build/windows/Dockerfile b/build/windows/Dockerfile index c4b27405..8712e6ca 100644 --- a/build/windows/Dockerfile +++ b/build/windows/Dockerfile @@ -4,7 +4,7 @@ USER ContainerAdministrator WORKDIR /app -COPY dist/agent.exe /app/ +COPY dist /app/ COPY static /app/static ENTRYPOINT ["C:/app/agent.exe"] diff --git a/cmd/agent/main.go b/cmd/agent/main.go index b882eb05..a937af1d 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -12,7 +12,7 @@ import ( "github.com/portainer/agent/ghw" "github.com/portainer/agent/http" "github.com/portainer/agent/http/client" - "github.com/portainer/agent/http/tunnel" + "github.com/portainer/agent/internal/edge" "github.com/portainer/agent/logutils" "github.com/portainer/agent/net" "github.com/portainer/agent/os" @@ -28,15 +28,17 @@ func main() { logutils.SetupLogger(options.LogLevel) var infoService agent.InfoService = docker.NewInfoService() - agentTags, err := retrieveInformationFromDockerEnvironment(infoService) + + agentTags, err := infoService.GetInformationFromDockerEngine() if err != nil { log.Fatalf("[ERROR] [main,docker] [message: Unable to retrieve information from Docker] [error: %s]", err) } - agentTags[agent.MemberTagKeyAgentPort] = options.AgentServerPort + + agentTags.AgentPort = options.AgentServerPort log.Printf("[DEBUG] [main,configuration] [Member tags: %+v]", agentTags) clusterMode := false - if agentTags[agent.MemberTagEngineStatus] == "swarm" { + if agentTags.EngineStatus == agent.EngineStatusSwarm { clusterMode = true log.Println("[INFO] [main] [message: Agent running on a Swarm cluster node. Running in cluster mode]") } @@ -84,35 +86,38 @@ func main() { defer clusterService.Leave() } - var tunnelOperator agent.TunnelOperator + edgeManager := edge.NewManager(options, advertiseAddr, clusterService, infoService) + if options.EdgeMode { - apiServerAddr := fmt.Sprintf("%s:%s", advertiseAddr, options.AgentServerPort) - - operatorConfig := &tunnel.OperatorConfig{ - APIServerAddr: apiServerAddr, - EdgeID: options.EdgeID, - PollFrequency: agent.DefaultEdgePollInterval, - InactivityTimeout: options.EdgeInactivityTimeout, - InsecurePoll: options.EdgeInsecurePoll, + edgeKey, err := retrieveEdgeKey(options.EdgeKey, clusterService) + if err != nil { + log.Printf("[ERROR] [main,edge] [message: Unable to retrieve Edge key] [error: %s]", err) } - log.Printf("[DEBUG] [main,edge,configuration] [api_addr: %s] [edge_id: %s] [poll_frequency: %s] [inactivity_timeout: %s] [insecure_poll: %t]", operatorConfig.APIServerAddr, operatorConfig.EdgeID, operatorConfig.PollFrequency, operatorConfig.InactivityTimeout, operatorConfig.InsecurePoll) + if edgeKey != "" { + log.Println("[DEBUG] [main,edge] [message: Edge key found in environment. Associating Edge key]") - tunnelOperator, err = tunnel.NewTunnelOperator(operatorConfig) - if err != nil { - log.Fatalf("[ERROR] [main,edge,rtunnel] [message: Unable to create tunnel operator] [error: %s]", err) - } + err := edgeManager.SetKey(edgeKey) + if err != nil { + log.Fatalf("[ERROR] [main,edge] [message: Unable to associate Edge key] [error: %s]", err) + } - err := enableEdgeMode(tunnelOperator, clusterService, options) - if err != nil { - log.Fatalf("[ERROR] [main,edge,rtunnel] [message: Unable to start agent in Edge mode] [error: %s]", err) + err = edgeManager.Start() + if err != nil { + log.Fatalf("[ERROR] [main,edge] [message: Unable to start Edge manager] [error: %s]", err) + } + + } else { + log.Println("[DEBUG] [main,edge] [message: Edge key not specified. Serving Edge UI]") + + serveEdgeUI(edgeManager, options.EdgeServerAddr, options.EdgeServerPort) } } systemService := ghw.NewSystemService(agent.HostRoot) var signatureService agent.DigitalSignatureService - if !options.EdgeMode { + if !edgeManager.IsEdgeModeEnabled() { signatureService = crypto.NewECDSAService(options.SharedSecret) tlsService := crypto.TLSService{} @@ -127,14 +132,13 @@ func main() { Port: options.AgentServerPort, SystemService: systemService, ClusterService: clusterService, + EdgeManager: edgeManager, SignatureService: signatureService, - TunnelOperator: tunnelOperator, AgentTags: agentTags, AgentOptions: options, - EdgeMode: options.EdgeMode, } - if options.EdgeMode { + if edgeManager.IsEdgeModeEnabled() { config.Addr = advertiseAddr } @@ -147,47 +151,25 @@ func main() { func startAPIServer(config *http.APIServerConfig) error { server := http.NewAPIServer(config) - if config.EdgeMode { + if config.EdgeManager.IsEdgeModeEnabled() { return server.StartUnsecured() } return server.StartSecured() } -func enableEdgeMode(tunnelOperator agent.TunnelOperator, clusterService agent.ClusterService, options *agent.Options) error { - - edgeKey, err := retrieveEdgeKey(options, clusterService) - if err != nil { - return err - } - - if edgeKey != "" { - log.Println("[DEBUG] [main,edge] [message: Edge key available. Starting tunnel operator.]") - - err := tunnelOperator.SetKey(edgeKey) - if err != nil { - return err - } - - if clusterService != nil { - tags := clusterService.GetTags() - tags[agent.MemberTagEdgeKeySet] = "set" - err = clusterService.UpdateTags(tags) - if err != nil { - return err - } - } - - return tunnelOperator.Start() - } +func parseOptions() (*agent.Options, error) { + optionParser := os.NewEnvOptionParser() + return optionParser.Options() +} - log.Println("[DEBUG] [main,edge] [message: Edge key not specified. Serving Edge UI]") - edgeServer := http.NewEdgeServer(tunnelOperator, clusterService) +func serveEdgeUI(edgeManager *edge.Manager, serverAddr, serverPort string) { + edgeServer := http.NewEdgeServer(edgeManager) go func() { - log.Printf("[INFO] [main,edge,http] [server_address: %s] [server_port: %s] [message: Starting Edge server]", options.EdgeServerAddr, options.EdgeServerPort) + log.Printf("[INFO] [main,edge,http] [server_address: %s] [server_port: %s] [message: Starting Edge server]", serverAddr, serverPort) - err := edgeServer.Start(options.EdgeServerAddr, options.EdgeServerPort) + err := edgeServer.Start(serverAddr, serverPort) if err != nil { log.Fatalf("[ERROR] [main,edge,http] [message: Unable to start Edge server] [error: %s]", err) } @@ -199,37 +181,32 @@ func enableEdgeMode(tunnelOperator agent.TunnelOperator, clusterService agent.Cl timer1 := time.NewTimer(agent.DefaultEdgeSecurityShutdown * time.Minute) <-timer1.C - if !tunnelOperator.IsKeySet() { + if !edgeManager.IsKeySet() { log.Printf("[INFO] [main,edge,http] [message: Shutting down Edge UI server as no key was specified after %d minutes]", agent.DefaultEdgeSecurityShutdown) edgeServer.Shutdown() } }() - - return nil } -func retrieveEdgeKey(options *agent.Options, clusterService agent.ClusterService) (string, error) { - edgeKey := options.EdgeKey +func retrieveEdgeKey(edgeKey string, clusterService agent.ClusterService) (string, error) { - if options.EdgeKey != "" { + if edgeKey != "" { log.Println("[INFO] [main,edge] [message: Edge key loaded from options]") - edgeKey = options.EdgeKey + return edgeKey, nil } - if edgeKey == "" { - var keyRetrievalError error + var keyRetrievalError error + + edgeKey, keyRetrievalError = retrieveEdgeKeyFromFilesystem() + if keyRetrievalError != nil { + return "", keyRetrievalError + } - edgeKey, keyRetrievalError = retrieveEdgeKeyFromFilesystem() + if edgeKey == "" && clusterService != nil { + edgeKey, keyRetrievalError = retrieveEdgeKeyFromCluster(clusterService) if keyRetrievalError != nil { return "", keyRetrievalError } - - if edgeKey == "" && clusterService != nil { - edgeKey, keyRetrievalError = retrieveEdgeKeyFromCluster(clusterService) - if keyRetrievalError != nil { - return "", keyRetrievalError - } - } } return edgeKey, nil @@ -278,17 +255,3 @@ func retrieveEdgeKeyFromCluster(clusterService agent.ClusterService) (string, er return edgeKey, nil } - -func parseOptions() (*agent.Options, error) { - optionParser := os.NewEnvOptionParser() - return optionParser.Options() -} - -func retrieveInformationFromDockerEnvironment(infoService agent.InfoService) (map[string]string, error) { - agentTags, err := infoService.GetInformationFromDockerEngine() - if err != nil { - return nil, err - } - - return agentTags, nil -} diff --git a/dev.sh b/dev.sh index 79b4b4d9..aa21c631 100755 --- a/dev.sh +++ b/dev.sh @@ -2,23 +2,31 @@ LOG_LEVEL=DEBUG CAP_HOST_MANAGEMENT=1 #Enabled by default. Change this to anything else to disable this feature -EDGE=0 +EDGE=1 TMP="/tmp" GIT_COMMIT_HASH=`git rev-parse --short HEAD` GIT_BRANCH_NAME=`git rev-parse --abbrev-ref HEAD` IMAGE_NAME="portainerci/agent:${GIT_BRANCH_NAME}-${GIT_COMMIT_HASH}" -if [[ $# -ne 1 ]] ; then - echo "Usage: $(basename $0) " - exit 1 + +MODE="swarm" +if [[ $# -gt 0 ]] ; then + MODE=$1 +fi + +SKIP_COMPILE=false +if [[ $# -eq 2 ]] ; then + SKIP_COMPILE=true fi -MODE=$1 +if [[ "${MODE}" == 'help' ]]; then + echo "Usage: $(basename $0) " + exit 0 +fi function compile() { echo "Compilation..." - rm -rf dist/* cd cmd/agent GOOS="linux" GOARCH="amd64" CGO_ENABLED=0 go build -a --installsuffix cgo --ldflags '-s' rc=$?; if [[ $rc != 0 ]]; then exit $rc; fi @@ -36,8 +44,6 @@ function deploy_local() { echo "Image build..." docker build --no-cache -t "${IMAGE_NAME}" -f build/linux/Dockerfile . -# docker push "${IMAGE_NAME}" - echo "Deployment..." docker run -d --name portainer-agent-dev \ @@ -58,7 +64,7 @@ function deploy_local() { function deploy_swarm() { DOCKER_MANAGER=tcp://10.0.7.10 DOCKER_NODE=tcp://10.0.7.11 -# DOCKER_NODE2=tcp://10.0.7.12 + # DOCKER_NODE2=tcp://10.0.7.12 EDGE_ID="a1a1c817-7f89-43b1-97e5-508d96c00be9" # generated via uuidgen @@ -70,14 +76,14 @@ function deploy_swarm() { docker -H "${DOCKER_MANAGER}:2375" network rm portainer-agent-dev-net docker -H "${DOCKER_MANAGER}:2375" rmi -f "${IMAGE_NAME}" docker -H "${DOCKER_NODE}:2375" rmi -f "${IMAGE_NAME}" -# docker -H "${DOCKER_NODE2}:2375" rmi -f "${IMAGE_NAME}" + # docker -H "${DOCKER_NODE2}:2375" rmi -f "${IMAGE_NAME}" echo "Building image locally and exporting to Swarm cluster..." docker build --no-cache -t "${IMAGE_NAME}" -f build/linux/Dockerfile . docker save "${IMAGE_NAME}" -o "${TMP}/portainer-agent.tar" docker -H "${DOCKER_MANAGER}:2375" load -i "${TMP}/portainer-agent.tar" docker -H "${DOCKER_NODE}:2375" load -i "${TMP}/portainer-agent.tar" -# docker -H "${DOCKER_NODE2}:2375" load -i "${TMP}/portainer-agent.tar" + # docker -H "${DOCKER_NODE2}:2375" load -i "${TMP}/portainer-agent.tar" echo "Sleep..." sleep 5 @@ -99,13 +105,15 @@ function deploy_swarm() { --publish mode=host,published=80,target=80 \ "${IMAGE_NAME}" -# --mount type=volume,src=portainer_agent_data,dst=/data \ + # --mount type=volume,src=portainer_agent_data,dst=/data \ docker -H "${DOCKER_MANAGER}:2375" service logs -f portainer-agent-dev } function main() { - compile + if [[ $SKIP_COMPILE == false ]]; then + compile + fi if [[ "${MODE}" == 'local' ]]; then deploy_local diff --git a/docker/docker.go b/docker/docker.go index 9607bcbd..763bcf9e 100644 --- a/docker/docker.go +++ b/docker/docker.go @@ -25,7 +25,7 @@ func NewInfoService() *InfoService { // GetInformationFromDockerEngine retrieves information from a Docker environment // and returns a map of labels. -func (service *InfoService) GetInformationFromDockerEngine() (map[string]string, error) { +func (service *InfoService) GetInformationFromDockerEngine() (*agent.InfoTags, error) { cli, err := client.NewClientWithOpts(client.FromEnv, client.WithVersion(agent.SupportedDockerAPIVersion)) if err != nil { return nil, err @@ -37,17 +37,18 @@ func (service *InfoService) GetInformationFromDockerEngine() (map[string]string, return nil, err } - info := make(map[string]string) - info[agent.MemberTagKeyNodeName] = dockerInfo.Name + info := &agent.InfoTags{} + info.NodeName = dockerInfo.Name if dockerInfo.Swarm.NodeID == "" { - info[agent.MemberTagEngineStatus] = "standalone" + getStandaloneInfo(info) } else { - info[agent.MemberTagEngineStatus] = "swarm" - info[agent.MemberTagKeyNodeRole] = agent.NodeRoleWorker - if dockerInfo.Swarm.ControlAvailable { - info[agent.MemberTagKeyNodeRole] = agent.NodeRoleManager + + err := getSwarmInformation(info, dockerInfo, cli) + if err != nil { + return nil, err } + } return info, nil @@ -109,3 +110,27 @@ func (service *InfoService) GetServiceNameFromDockerEngine(containerName string) return containerInspect.Config.Labels[serviceNameLabel], nil } + +func getStandaloneInfo(info *agent.InfoTags) { + info.EngineStatus = agent.EngineStatusStandalone +} + +func getSwarmInformation(info *agent.InfoTags, dockerInfo types.Info, cli *client.Client) error { + info.EngineStatus = agent.EngineStatusSwarm + info.NodeRole = agent.NodeRoleWorker + + if dockerInfo.Swarm.ControlAvailable { + info.NodeRole = agent.NodeRoleManager + + node, _, err := cli.NodeInspectWithRaw(context.Background(), dockerInfo.Swarm.NodeID) + if err != nil { + return err + } + + if node.ManagerStatus.Leader { + info.Leader = true + } + } + + return nil +} diff --git a/exec/stack.go b/exec/stack.go new file mode 100644 index 00000000..f4601ca6 --- /dev/null +++ b/exec/stack.go @@ -0,0 +1,85 @@ +package exec + +import ( + "bytes" + "errors" + "os/exec" + "path" + "runtime" +) + +// DockerStackService represents a service for managing stacks by using the Docker binary. +type DockerStackService struct { + binaryPath string +} + +// NewDockerStackService initializes a new DockerStackService service. +// It also updates the configuration of the Docker CLI binary. +func NewDockerStackService(binaryPath string) (*DockerStackService, error) { + service := &DockerStackService{ + binaryPath: binaryPath, + } + + return service, nil +} + +// Login executes the docker login command against a list of registries (including DockerHub). +func (service *DockerStackService) Login() error { + // Not implemented yet. + return nil +} + +// Logout executes the docker logout command. +func (service *DockerStackService) Logout() error { + command := service.prepareDockerCommand(service.binaryPath) + args := []string{"logout"} + return runCommandAndCaptureStdErr(command, args, "") + +} + +// Deploy executes the docker stack deploy command. +func (service *DockerStackService) Deploy(name, stackFilePath string, prune bool) error { + command := service.prepareDockerCommand(service.binaryPath) + + args := []string{} + if prune { + args = append(args, "stack", "deploy", "--prune", "--with-registry-auth", "--compose-file", stackFilePath, name) + } else { + args = append(args, "stack", "deploy", "--with-registry-auth", "--compose-file", stackFilePath, name) + } + + stackFolder := path.Dir(stackFilePath) + return runCommandAndCaptureStdErr(command, args, stackFolder) +} + +// Remove executes the docker stack rm command. +func (service *DockerStackService) Remove(name string) error { + command := service.prepareDockerCommand(service.binaryPath) + args := []string{"stack", "rm", name} + return runCommandAndCaptureStdErr(command, args, "") +} + +func runCommandAndCaptureStdErr(command string, args []string, workingDir string) error { + var stderr bytes.Buffer + cmd := exec.Command(command, args...) + cmd.Stderr = &stderr + cmd.Dir = workingDir + + err := cmd.Run() + if err != nil { + return errors.New(stderr.String()) + } + + return nil +} + +func (service *DockerStackService) prepareDockerCommand(binaryPath string) string { + // Assume Linux as a default + command := path.Join(binaryPath, "docker") + + if runtime.GOOS == "windows" { + command = path.Join(binaryPath, "docker.exe") + } + + return command +} diff --git a/http/client/portainer_client.go b/http/client/portainer_client.go new file mode 100644 index 00000000..c17d6cef --- /dev/null +++ b/http/client/portainer_client.go @@ -0,0 +1,119 @@ +package client + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "log" + "net/http" + "strconv" + "time" + + "github.com/portainer/agent" +) + +// PortainerClient is used to execute HTTP requests against the Portainer API +type PortainerClient struct { + httpClient *http.Client + serverAddress string + endpointID string + edgeID string +} + +// NewPortainerClient returns a pointer to a new PortainerClient instance +func NewPortainerClient(serverAddress, endpointID, edgeID string) *PortainerClient { + return &PortainerClient{ + serverAddress: serverAddress, + endpointID: endpointID, + edgeID: edgeID, + httpClient: &http.Client{ + Timeout: 10 * time.Second, + }, + } +} + +type stackConfigResponse struct { + Name string + StackFileContent string + Prune bool +} + +// GetEdgeStackConfig retrieves the configuration associated to an Edge stack +func (client *PortainerClient) GetEdgeStackConfig(edgeStackID int) (*agent.EdgeStackConfig, error) { + requestURL := fmt.Sprintf("%s/api/endpoints/%s/edge/stacks/%d", client.serverAddress, client.endpointID, edgeStackID) + + req, err := http.NewRequest(http.MethodGet, requestURL, nil) + if err != nil { + return nil, err + } + + req.Header.Set(agent.HTTPEdgeIdentifierHeaderName, client.edgeID) + + resp, err := client.httpClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + log.Printf("[ERROR] [http,client,portainer] [response_code: %d] [message: GetEdgeStackConfig operation failed]", resp.StatusCode) + return nil, errors.New("GetEdgeStackConfig operation failed") + } + + var data stackConfigResponse + err = json.NewDecoder(resp.Body).Decode(&data) + if err != nil { + return nil, err + } + + return &agent.EdgeStackConfig{Name: data.Name, FileContent: data.StackFileContent, Prune: data.Prune}, nil +} + +type setEdgeStackStatusPayload struct { + Error string + Status int + EndpointID int +} + +// SetEdgeStackStatus updates the status of an Edge stack on the Portainer server +func (client *PortainerClient) SetEdgeStackStatus(edgeStackID, edgeStackStatus int, error string) error { + endpointID, err := strconv.Atoi(client.endpointID) + if err != nil { + return err + } + + payload := setEdgeStackStatusPayload{ + Error: error, + Status: edgeStackStatus, + EndpointID: endpointID, + } + + data, err := json.Marshal(payload) + if err != nil { + return err + } + + requestURL := fmt.Sprintf("%s/api/edge_stacks/%d/status", client.serverAddress, edgeStackID) + + req, err := http.NewRequest(http.MethodPut, requestURL, bytes.NewReader(data)) + if err != nil { + return err + } + + req.Header.Set(agent.HTTPEdgeIdentifierHeaderName, client.edgeID) + + resp, err := client.httpClient.Do(req) + if err != nil { + return err + } + + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + log.Printf("[ERROR] [http,client,portainer] [response_code: %d] [message: SetEdgeStackStatus operation failed]", resp.StatusCode) + return errors.New("SetEdgeStackStatus operation failed") + } + + return nil +} diff --git a/http/edge.go b/http/edge.go index 3a43e054..59200742 100644 --- a/http/edge.go +++ b/http/edge.go @@ -2,29 +2,25 @@ package http import ( "context" - "fmt" "log" "net/http" "time" - "github.com/portainer/agent/http/client" + "github.com/portainer/agent/internal/edge" "github.com/gorilla/mux" - "github.com/portainer/agent" ) // EdgeServer expose an UI to associate an Edge key with the agent. type EdgeServer struct { - httpServer *http.Server - tunnelOperator agent.TunnelOperator - clusterService agent.ClusterService + httpServer *http.Server + edgeManager *edge.Manager } // NewEdgeServer returns a pointer to a new instance of EdgeServer. -func NewEdgeServer(tunnelOperator agent.TunnelOperator, clusterService agent.ClusterService) *EdgeServer { +func NewEdgeServer(edgeManager *edge.Manager) *EdgeServer { return &EdgeServer{ - tunnelOperator: tunnelOperator, - clusterService: clusterService, + edgeManager: edgeManager, } } @@ -59,47 +55,28 @@ func (server *EdgeServer) handleKeySetup() http.HandlerFunc { return } - err = server.tunnelOperator.SetKey(key) + err = server.edgeManager.SetKey(key) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - if server.clusterService != nil { - tags := server.clusterService.GetTags() - tags[agent.MemberTagEdgeKeySet] = "set" - err = server.clusterService.UpdateTags(tags) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - go server.propagateKeyInCluster(tags[agent.MemberTagKeyNodeName], key) + err = server.edgeManager.Start() + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) } - go server.tunnelOperator.Start() + go server.propagateKeyInCluster() w.Write([]byte("Agent setup OK. You can close this page.")) server.Shutdown() } } -func (server *EdgeServer) propagateKeyInCluster(currentNodeName, key string) { - httpCli := client.NewAPIClient() - - members := server.clusterService.Members() - for _, member := range members { - - if member.NodeName == currentNodeName || member.EdgeKeySet { - continue - } - - memberAddr := fmt.Sprintf("%s:%s", member.IPAddress, member.Port) - - err := httpCli.SetEdgeKey(memberAddr, key) - if err != nil { - log.Printf("[ERROR] [edge,http] [member_address: %s] [message: Unable to propagate key to cluster member] [err: %s]", memberAddr, err) - } +func (server *EdgeServer) propagateKeyInCluster() { + err := server.edgeManager.PropagateKeyInCluster() + if err != nil { + log.Printf("[ERROR] [edge,http] [message: Unable to propagate key to cluster] [err: %s]", err) } } diff --git a/http/handler/docker/docker_operation.go b/http/handler/docker/docker_operation.go index 28faaa80..e6665513 100644 --- a/http/handler/docker/docker_operation.go +++ b/http/handler/docker/docker_operation.go @@ -59,7 +59,7 @@ func (handler *Handler) dispatchOperation(rw http.ResponseWriter, request *http. } func (handler *Handler) executeOperationOnManagerNode(rw http.ResponseWriter, request *http.Request) *httperror.HandlerError { - if handler.agentTags[agent.MemberTagKeyNodeRole] == agent.NodeRoleManager { + if handler.agentTags.NodeRole == agent.NodeRoleManager { handler.dockerProxy.ServeHTTP(rw, request) } else { targetMember := handler.clusterService.GetMemberByRole(agent.NodeRoleManager) @@ -75,7 +75,7 @@ func (handler *Handler) executeOperationOnManagerNode(rw http.ResponseWriter, re func (handler *Handler) executeOperationOnNode(rw http.ResponseWriter, request *http.Request) *httperror.HandlerError { agentTargetHeader := request.Header.Get(agent.HTTPTargetHeaderName) - if agentTargetHeader == handler.agentTags[agent.MemberTagKeyNodeName] || agentTargetHeader == "" { + if agentTargetHeader == handler.agentTags.NodeName || agentTargetHeader == "" { handler.dockerProxy.ServeHTTP(rw, request) } else { targetMember := handler.clusterService.GetMemberByNodeName(agentTargetHeader) @@ -92,7 +92,7 @@ func (handler *Handler) executeOperationOnNode(rw http.ResponseWriter, request * func (handler *Handler) executeOperationOnCluster(rw http.ResponseWriter, request *http.Request) *httperror.HandlerError { agentTargetHeader := request.Header.Get(agent.HTTPTargetHeaderName) - if agentTargetHeader == handler.agentTags[agent.MemberTagKeyNodeName] { + if agentTargetHeader == handler.agentTags.NodeName { handler.dockerProxy.ServeHTTP(rw, request) return nil } diff --git a/http/handler/docker/handler.go b/http/handler/docker/handler.go index 683f07d9..f9ab0e03 100644 --- a/http/handler/docker/handler.go +++ b/http/handler/docker/handler.go @@ -14,13 +14,13 @@ type Handler struct { dockerProxy *proxy.LocalProxy clusterProxy *proxy.ClusterProxy clusterService agent.ClusterService - agentTags map[string]string + agentTags *agent.InfoTags useTLS bool } // NewHandler returns a new instance of Handler. // It sets the associated handle functions for all the Docker related HTTP endpoints. -func NewHandler(clusterService agent.ClusterService, agentTags map[string]string, notaryService *security.NotaryService, useTLS bool) *Handler { +func NewHandler(clusterService agent.ClusterService, agentTags *agent.InfoTags, notaryService *security.NotaryService, useTLS bool) *Handler { h := &Handler{ Router: mux.NewRouter(), dockerProxy: proxy.NewLocalProxy(), diff --git a/http/handler/handler.go b/http/handler/handler.go index d3dcd9e4..09d91636 100644 --- a/http/handler/handler.go +++ b/http/handler/handler.go @@ -17,6 +17,7 @@ import ( "github.com/portainer/agent/http/handler/websocket" "github.com/portainer/agent/http/proxy" "github.com/portainer/agent/http/security" + "github.com/portainer/agent/internal/edge" httperror "github.com/portainer/libhttp/error" ) @@ -32,7 +33,7 @@ type Handler struct { hostHandler *host.Handler pingHandler *ping.Handler securedProtocol bool - tunnelOperator agent.TunnelOperator + edgeManager *edge.Manager } // Config represents a server handler configuration @@ -41,11 +42,10 @@ type Config struct { SystemService agent.SystemService ClusterService agent.ClusterService SignatureService agent.DigitalSignatureService - TunnelOperator agent.TunnelOperator - AgentTags map[string]string + EdgeManager *edge.Manager + AgentTags *agent.InfoTags AgentOptions *agent.Options Secured bool - EdgeMode bool } var dockerAPIVersionRegexp = regexp.MustCompile(`(/v[0-9]\.[0-9]*)?`) @@ -60,12 +60,12 @@ func NewHandler(config *Config) *Handler { browseHandler: browse.NewHandler(agentProxy, notaryService, config.AgentOptions), browseHandlerV1: browse.NewHandlerV1(agentProxy, notaryService), dockerProxyHandler: docker.NewHandler(config.ClusterService, config.AgentTags, notaryService, config.Secured), - keyHandler: key.NewHandler(config.TunnelOperator, config.ClusterService, notaryService, config.EdgeMode), + keyHandler: key.NewHandler(notaryService, config.EdgeManager), webSocketHandler: websocket.NewHandler(config.ClusterService, config.AgentTags, notaryService), hostHandler: host.NewHandler(config.SystemService, agentProxy, notaryService), pingHandler: ping.NewHandler(), securedProtocol: config.Secured, - tunnelOperator: config.TunnelOperator, + edgeManager: config.EdgeManager, } } @@ -75,13 +75,13 @@ func (h *Handler) ServeHTTP(rw http.ResponseWriter, request *http.Request) { return } - if !h.securedProtocol && !h.tunnelOperator.IsKeySet() { + if !h.securedProtocol && !h.edgeManager.IsKeySet() { httperror.WriteError(rw, http.StatusForbidden, "Unable to use the unsecured agent API without Edge key", errors.New("edge key not set")) return } - if h.tunnelOperator != nil { - h.tunnelOperator.ResetActivityTimer() + if h.edgeManager.IsEdgeModeEnabled() { + h.edgeManager.ResetActivityTimer() } request.URL.Path = dockerAPIVersionRegexp.ReplaceAllString(request.URL.Path, "") diff --git a/http/handler/key/handler.go b/http/handler/key/handler.go index d76e1128..f0040b7c 100644 --- a/http/handler/key/handler.go +++ b/http/handler/key/handler.go @@ -3,31 +3,26 @@ package key import ( "net/http" - "github.com/portainer/agent" - "github.com/gorilla/mux" "github.com/portainer/agent/http/security" + "github.com/portainer/agent/internal/edge" httperror "github.com/portainer/libhttp/error" ) // Handler is the HTTP handler used to handle Edge key operations. type Handler struct { *mux.Router - tunnelOperator agent.TunnelOperator - clusterService agent.ClusterService - edgeMode bool + edgeManager *edge.Manager } // NewHandler returns a pointer to an Handler // It sets the associated handle functions for all the Edge key related HTTP endpoints. // This handler is meant to be used when the agent is started in Edge mode, all the API endpoints will return // a HTTP 503 service not available if edge mode is disabled. -func NewHandler(tunnelOperator agent.TunnelOperator, clusterService agent.ClusterService, notaryService *security.NotaryService, edgeMode bool) *Handler { +func NewHandler(notaryService *security.NotaryService, edgeManager *edge.Manager) *Handler { h := &Handler{ - Router: mux.NewRouter(), - tunnelOperator: tunnelOperator, - clusterService: clusterService, - edgeMode: edgeMode, + Router: mux.NewRouter(), + edgeManager: edgeManager, } h.Handle("/key", diff --git a/http/handler/key/key_create.go b/http/handler/key/key_create.go index c2c62b2b..372a73b9 100644 --- a/http/handler/key/key_create.go +++ b/http/handler/key/key_create.go @@ -5,8 +5,6 @@ import ( "log" "net/http" - "github.com/portainer/agent" - "github.com/portainer/libhttp/request" httperror "github.com/portainer/libhttp/error" @@ -25,11 +23,11 @@ func (payload *keyCreatePayload) Validate(r *http.Request) error { } func (handler *Handler) keyCreate(w http.ResponseWriter, r *http.Request) *httperror.HandlerError { - if !handler.edgeMode { + if !handler.edgeManager.IsEdgeModeEnabled() { return &httperror.HandlerError{http.StatusServiceUnavailable, "Edge key management is disabled on non Edge agent", errors.New("Edge key management is disabled")} } - if handler.tunnelOperator.IsKeySet() { + if handler.edgeManager.IsKeySet() { return &httperror.HandlerError{http.StatusConflict, "An Edge key is already associated to this agent", errors.New("Edge key already associated")} } @@ -41,21 +39,15 @@ func (handler *Handler) keyCreate(w http.ResponseWriter, r *http.Request) *httpe return &httperror.HandlerError{http.StatusBadRequest, "Invalid request payload", err} } - err = handler.tunnelOperator.SetKey(payload.Key) + err = handler.edgeManager.SetKey(payload.Key) if err != nil { return &httperror.HandlerError{http.StatusInternalServerError, "Unable to associate Edge key", err} } - if handler.clusterService != nil { - tags := handler.clusterService.GetTags() - tags[agent.MemberTagEdgeKeySet] = "set" - err = handler.clusterService.UpdateTags(tags) - if err != nil { - return &httperror.HandlerError{http.StatusInternalServerError, "Unable to update agent cluster tags", err} - } + err = handler.edgeManager.Start() + if err != nil { + return &httperror.HandlerError{http.StatusInternalServerError, "Unable to start Edge Manager", err} } - go handler.tunnelOperator.Start() - return response.Empty(w) } diff --git a/http/handler/key/key_inspect.go b/http/handler/key/key_inspect.go index 37f4fe92..0b9bd25f 100644 --- a/http/handler/key/key_inspect.go +++ b/http/handler/key/key_inspect.go @@ -13,15 +13,15 @@ type keyInspectResponse struct { } func (handler *Handler) keyInspect(w http.ResponseWriter, r *http.Request) *httperror.HandlerError { - if !handler.edgeMode { + if !handler.edgeManager.IsEdgeModeEnabled() { return &httperror.HandlerError{http.StatusServiceUnavailable, "Edge key management is disabled on non Edge agent", errors.New("Edge key management is disabled")} } - if !handler.tunnelOperator.IsKeySet() { + if !handler.edgeManager.IsKeySet() { return &httperror.HandlerError{http.StatusNotFound, "No key associated to this agent", errors.New("Edge key unavailable")} } - edgeKey := handler.tunnelOperator.GetKey() + edgeKey := handler.edgeManager.GetKey() return response.JSON(w, keyInspectResponse{ Key: edgeKey, diff --git a/http/handler/websocket/attach.go b/http/handler/websocket/attach.go index 6c4ba2f2..5e2677b2 100644 --- a/http/handler/websocket/attach.go +++ b/http/handler/websocket/attach.go @@ -23,7 +23,7 @@ func (handler *Handler) websocketAttach(w http.ResponseWriter, r *http.Request) } agentTargetHeader := r.Header.Get(agent.HTTPTargetHeaderName) - if agentTargetHeader == handler.agentTags[agent.MemberTagKeyNodeName] { + if agentTargetHeader == handler.agentTags.NodeName { return handler.handleAttachRequest(w, r) } diff --git a/http/handler/websocket/exec.go b/http/handler/websocket/exec.go index 77db8b02..e3c1b290 100644 --- a/http/handler/websocket/exec.go +++ b/http/handler/websocket/exec.go @@ -24,7 +24,7 @@ func (handler *Handler) websocketExec(w http.ResponseWriter, r *http.Request) *h agentTargetHeader := r.Header.Get(agent.HTTPTargetHeaderName) - if agentTargetHeader == handler.agentTags[agent.MemberTagKeyNodeName] { + if agentTargetHeader == handler.agentTags.NodeName { return handler.handleExecRequest(w, r) } diff --git a/http/handler/websocket/handler.go b/http/handler/websocket/handler.go index 8533f299..d3de7284 100644 --- a/http/handler/websocket/handler.go +++ b/http/handler/websocket/handler.go @@ -14,7 +14,7 @@ type ( *mux.Router clusterService agent.ClusterService connectionUpgrader websocket.Upgrader - agentTags map[string]string + agentTags *agent.InfoTags } execStartOperationPayload struct { @@ -24,7 +24,7 @@ type ( ) // NewHandler returns a new instance of Handler. -func NewHandler(clusterService agent.ClusterService, agentTags map[string]string, notaryService *security.NotaryService) *Handler { +func NewHandler(clusterService agent.ClusterService, agentTags *agent.InfoTags, notaryService *security.NotaryService) *Handler { h := &Handler{ Router: mux.NewRouter(), connectionUpgrader: websocket.Upgrader{}, diff --git a/http/proxy/agentproxy.go b/http/proxy/agentproxy.go index f2045f67..beeb4c16 100644 --- a/http/proxy/agentproxy.go +++ b/http/proxy/agentproxy.go @@ -12,12 +12,12 @@ import ( // AgentProxy enables redirection to different nodes type AgentProxy struct { clusterService agent.ClusterService - agentTags map[string]string + agentTags *agent.InfoTags useTLS bool } // NewAgentProxy returns a pointer to a new AgentProxy object -func NewAgentProxy(clusterService agent.ClusterService, agentTags map[string]string, useTLS bool) *AgentProxy { +func NewAgentProxy(clusterService agent.ClusterService, agentTags *agent.InfoTags, useTLS bool) *AgentProxy { return &AgentProxy{ clusterService: clusterService, agentTags: agentTags, @@ -36,7 +36,7 @@ func (p *AgentProxy) Redirect(next http.Handler) http.Handler { agentTargetHeader := r.Header.Get(agent.HTTPTargetHeaderName) - if agentTargetHeader == p.agentTags[agent.MemberTagKeyNodeName] || agentTargetHeader == "" { + if agentTargetHeader == p.agentTags.NodeName || agentTargetHeader == "" { next.ServeHTTP(rw, r) } else { targetMember := p.clusterService.GetMemberByNodeName(agentTargetHeader) diff --git a/http/server.go b/http/server.go index dcaada81..03d7e510 100644 --- a/http/server.go +++ b/http/server.go @@ -7,6 +7,7 @@ import ( "github.com/portainer/agent" "github.com/portainer/agent/http/handler" + "github.com/portainer/agent/internal/edge" ) // APIServer is the web server exposing the API of an agent. @@ -16,10 +17,9 @@ type APIServer struct { systemService agent.SystemService clusterService agent.ClusterService signatureService agent.DigitalSignatureService - tunnelOperator agent.TunnelOperator - agentTags map[string]string + edgeManager *edge.Manager + agentTags *agent.InfoTags agentOptions *agent.Options - edgeMode bool } // APIServerConfig represents a server configuration @@ -30,10 +30,9 @@ type APIServerConfig struct { SystemService agent.SystemService ClusterService agent.ClusterService SignatureService agent.DigitalSignatureService - TunnelOperator agent.TunnelOperator - AgentTags map[string]string + EdgeManager *edge.Manager + AgentTags *agent.InfoTags AgentOptions *agent.Options - EdgeMode bool } // NewAPIServer returns a pointer to a APIServer. @@ -44,10 +43,9 @@ func NewAPIServer(config *APIServerConfig) *APIServer { systemService: config.SystemService, clusterService: config.ClusterService, signatureService: config.SignatureService, - tunnelOperator: config.TunnelOperator, + edgeManager: config.EdgeManager, agentTags: config.AgentTags, agentOptions: config.AgentOptions, - edgeMode: config.EdgeMode, } } @@ -56,10 +54,9 @@ func (server *APIServer) StartUnsecured() error { config := &handler.Config{ SystemService: server.systemService, ClusterService: server.clusterService, - TunnelOperator: server.tunnelOperator, AgentTags: server.agentTags, AgentOptions: server.agentOptions, - EdgeMode: server.edgeMode, + EdgeManager: server.edgeManager, Secured: false, } @@ -84,10 +81,9 @@ func (server *APIServer) StartSecured() error { SystemService: server.systemService, ClusterService: server.clusterService, SignatureService: server.signatureService, - TunnelOperator: server.tunnelOperator, AgentTags: server.agentTags, AgentOptions: server.agentOptions, - EdgeMode: server.edgeMode, + EdgeManager: server.edgeManager, Secured: true, } diff --git a/http/tunnel/key.go b/http/tunnel/key.go deleted file mode 100644 index 02e2f74c..00000000 --- a/http/tunnel/key.go +++ /dev/null @@ -1,39 +0,0 @@ -package tunnel - -import ( - "encoding/base64" - "errors" - "fmt" - "strings" -) - -// parseEdgeKey decodes a base64 encoded key and extract the decoded information from the following -// format: ||| -// are expected in the user:password format -func parseEdgeKey(key string) (*edgeKey, error) { - decodedKey, err := base64.RawStdEncoding.DecodeString(key) - if err != nil { - return nil, err - } - - keyInfo := strings.Split(string(decodedKey), "|") - - if len(keyInfo) != 4 { - return nil, errors.New("invalid key format") - } - - edgeKey := &edgeKey{ - PortainerInstanceURL: keyInfo[0], - TunnelServerAddr: keyInfo[1], - TunnelServerFingerprint: keyInfo[2], - EndpointID: keyInfo[3], - } - - return edgeKey, nil -} - -func encodeKey(edgeKey *edgeKey) string { - keyInfo := fmt.Sprintf("%s|%s|%s|%s", edgeKey.PortainerInstanceURL, edgeKey.TunnelServerAddr, edgeKey.TunnelServerFingerprint, edgeKey.EndpointID) - encodedKey := base64.RawStdEncoding.EncodeToString([]byte(keyInfo)) - return encodedKey -} diff --git a/http/tunnel/poll.go b/http/tunnel/poll.go deleted file mode 100644 index b9205c99..00000000 --- a/http/tunnel/poll.go +++ /dev/null @@ -1,136 +0,0 @@ -package tunnel - -import ( - "crypto/tls" - "encoding/base64" - "encoding/json" - "errors" - "fmt" - "log" - "net/http" - "strconv" - "time" - - "github.com/portainer/agent" - "github.com/portainer/libcrypto" -) - -const clientDefaultPollTimeout = 5 - -type pollStatusResponse struct { - Status string `json:"status"` - Port int `json:"port"` - Schedules []agent.Schedule `json:"schedules"` - CheckinInterval float64 `json:"checkin"` - Credentials string `json:"credentials"` -} - -func (operator *Operator) createHTTPClient(timeout float64) { - httpCli := &http.Client{ - Timeout: time.Duration(timeout) * time.Second, - } - - if operator.insecurePoll { - httpCli.Transport = &http.Transport{ - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: true, - }, - } - } - - operator.httpClient = httpCli -} - -func (operator *Operator) poll() error { - pollURL := fmt.Sprintf("%s/api/endpoints/%s/status", operator.key.PortainerInstanceURL, operator.key.EndpointID) - req, err := http.NewRequest("GET", pollURL, nil) - if err != nil { - return err - } - - req.Header.Set(agent.HTTPEdgeIdentifierHeaderName, operator.edgeID) - - if operator.httpClient == nil { - operator.createHTTPClient(clientDefaultPollTimeout) - } - - resp, err := operator.httpClient.Do(req) - if err != nil { - return err - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - log.Printf("[DEBUG] [http,edge,poll] [response_code: %d] [message: Poll request failure]", resp.StatusCode) - return errors.New("short poll request failed") - } - - var responseData pollStatusResponse - err = json.NewDecoder(resp.Body).Decode(&responseData) - if err != nil { - return err - } - - log.Printf("[DEBUG] [http,edge,poll] [status: %s] [port: %d] [schedule_count: %d] [checkin_interval_seconds: %f]", responseData.Status, responseData.Port, len(responseData.Schedules), responseData.CheckinInterval) - - if responseData.Status == "IDLE" && operator.tunnelClient.IsTunnelOpen() { - log.Printf("[DEBUG] [http,edge,poll] [status: %s] [message: Idle status detected, shutting down tunnel]", responseData.Status) - - err := operator.tunnelClient.CloseTunnel() - if err != nil { - log.Printf("[ERROR] [http,edge,poll] [message: Unable to shutdown tunnel] [error: %s]", err) - } - } - - if responseData.Status == "REQUIRED" && !operator.tunnelClient.IsTunnelOpen() { - log.Println("[DEBUG] [http,edge,poll] [message: Required status detected, creating reverse tunnel]") - - err := operator.createTunnel(responseData.Credentials, responseData.Port) - if err != nil { - log.Printf("[ERROR] [http,edge,poll] [message: Unable to create tunnel] [error: %s]", err) - return err - } - } - - err = operator.scheduleManager.Schedule(responseData.Schedules) - if err != nil { - log.Printf("[ERROR] [http,edge,cron] [message: an error occured during schedule management] [err: %s]", err) - } - - if responseData.CheckinInterval != operator.pollIntervalInSeconds { - log.Printf("[DEBUG] [http,edge,poll] [old_interval: %f] [new_interval: %f] [message: updating poll interval]", operator.pollIntervalInSeconds, responseData.CheckinInterval) - operator.pollIntervalInSeconds = responseData.CheckinInterval - operator.createHTTPClient(responseData.CheckinInterval) - go operator.restartStatusPollLoop() - } - - return nil -} - -func (operator *Operator) createTunnel(encodedCredentials string, remotePort int) error { - decodedCredentials, err := base64.RawStdEncoding.DecodeString(encodedCredentials) - if err != nil { - return err - } - - credentials, err := libcrypto.Decrypt(decodedCredentials, []byte(operator.edgeID)) - if err != nil { - return err - } - - tunnelConfig := agent.TunnelConfig{ - ServerAddr: operator.key.TunnelServerAddr, - ServerFingerpint: operator.key.TunnelServerFingerprint, - Credentials: string(credentials), - RemotePort: strconv.Itoa(remotePort), - LocalAddr: operator.apiServerAddr, - } - - err = operator.tunnelClient.CreateTunnel(tunnelConfig) - if err != nil { - return err - } - - operator.ResetActivityTimer() - return nil -} diff --git a/http/tunnel/tunnel.go b/http/tunnel/tunnel.go deleted file mode 100644 index d5a61c34..00000000 --- a/http/tunnel/tunnel.go +++ /dev/null @@ -1,198 +0,0 @@ -package tunnel - -import ( - "errors" - "log" - "net/http" - "time" - - "github.com/portainer/agent" - "github.com/portainer/agent/chisel" - "github.com/portainer/agent/filesystem" -) - -const tunnelActivityCheckInterval = 30 * time.Second - -type edgeKey struct { - PortainerInstanceURL string - TunnelServerAddr string - TunnelServerFingerprint string - EndpointID string -} - -// Operator is used to poll a Portainer instance and to establish a reverse tunnel if needed. -// It also takes care of closing the tunnel after a set period of inactivity. -type Operator struct { - apiServerAddr string - pollIntervalInSeconds float64 - insecurePoll bool - inactivityTimeout time.Duration - edgeID string - key *edgeKey - httpClient *http.Client - tunnelClient agent.ReverseTunnelClient - scheduleManager agent.Scheduler - lastActivity time.Time - refreshSignal chan struct{} -} - -// OperatorConfig represents the configuration used to create a new Operator. -type OperatorConfig struct { - APIServerAddr string - EdgeID string - InactivityTimeout string - PollFrequency string - InsecurePoll bool -} - -// NewTunnelOperator creates a new reverse tunnel operator -func NewTunnelOperator(config *OperatorConfig) (*Operator, error) { - pollFrequency, err := time.ParseDuration(config.PollFrequency) - if err != nil { - return nil, err - } - - inactivityTimeout, err := time.ParseDuration(config.InactivityTimeout) - if err != nil { - return nil, err - } - - return &Operator{ - apiServerAddr: config.APIServerAddr, - edgeID: config.EdgeID, - pollIntervalInSeconds: pollFrequency.Seconds(), - insecurePoll: config.InsecurePoll, - inactivityTimeout: inactivityTimeout, - tunnelClient: chisel.NewClient(), - scheduleManager: filesystem.NewCronManager(), - refreshSignal: make(chan struct{}), - }, nil -} - -// SetKey parses and associate a key to the operator -func (operator *Operator) SetKey(key string) error { - edgeKey, err := parseEdgeKey(key) - if err != nil { - return err - } - - err = filesystem.WriteFile(agent.DataDirectory, agent.EdgeKeyFile, []byte(key), 0444) - if err != nil { - return err - } - - operator.key = edgeKey - - return nil -} - -// GetKey returns the key associated to the operator -func (operator *Operator) GetKey() string { - var encodedKey string - - if operator.key != nil { - encodedKey = encodeKey(operator.key) - } - - return encodedKey -} - -// IsKeySet checks if a key is associated to the operator -func (operator *Operator) IsKeySet() bool { - if operator.key == nil { - return false - } - return true -} - -// CloseTunnel closes the reverse tunnel managed by the operator -func (operator *Operator) CloseTunnel() error { - return operator.tunnelClient.CloseTunnel() -} - -// ResetActivityTimer will reset the last activity time timer -func (operator *Operator) ResetActivityTimer() { - if operator.tunnelClient.IsTunnelOpen() { - operator.lastActivity = time.Now() - } -} - -// Start will start two loops in go routines -// The first loop will poll the Portainer instance for the status of the associated endpoint and create a reverse tunnel -// if needed as well as manage schedules. -// The second loop will check for the last activity of the reverse tunnel and close the tunnel if it exceeds the tunnel -// inactivity duration. -func (operator *Operator) Start() error { - if operator.key == nil { - return errors.New("missing Edge key") - } - - operator.startStatusPollLoop() - operator.startActivityMonitoringLoop() - - return nil -} - -func (operator *Operator) restartStatusPollLoop() { - close(operator.refreshSignal) - operator.refreshSignal = make(chan struct{}) - operator.startStatusPollLoop() -} - -func (operator *Operator) startStatusPollLoop() { - log.Printf("[DEBUG] [http,edge,poll] [poll_interval_seconds: %f] [server_url: %s] [message: starting Portainer short-polling client]", operator.pollIntervalInSeconds, operator.key.PortainerInstanceURL) - - ticker := time.NewTicker(time.Duration(operator.pollIntervalInSeconds) * time.Second) - go func() { - for { - select { - case <-ticker.C: - err := operator.poll() - if err != nil { - log.Printf("[ERROR] [edge,http,poll] [message: an error occured during short poll] [error: %s]", err) - } - - case <-operator.refreshSignal: - log.Println("[DEBUG] [http,edge,poll] [message: shutting down Portainer short-polling client]") - ticker.Stop() - return - } - } - }() -} - -func (operator *Operator) startActivityMonitoringLoop() { - ticker := time.NewTicker(tunnelActivityCheckInterval) - quit := make(chan struct{}) - - log.Printf("[DEBUG] [http,edge,monitoring] [monitoring_interval_seconds: %f] [inactivity_timeout: %s] [message: starting activity monitoring loop]", tunnelActivityCheckInterval.Seconds(), operator.inactivityTimeout.String()) - - go func() { - for { - select { - case <-ticker.C: - - if operator.lastActivity.IsZero() { - continue - } - - elapsed := time.Since(operator.lastActivity) - log.Printf("[DEBUG] [http,edge,monitoring] [tunnel_last_activity_seconds: %f] [message: tunnel activity monitoring]", elapsed.Seconds()) - - if operator.tunnelClient.IsTunnelOpen() && elapsed.Seconds() > operator.inactivityTimeout.Seconds() { - - log.Printf("[INFO] [http,edge,monitoring] [tunnel_last_activity_seconds: %f] [message: shutting down tunnel after inactivity period]", elapsed.Seconds()) - - err := operator.tunnelClient.CloseTunnel() - if err != nil { - log.Printf("[ERROR] [http,edge,monitoring] [message: unable to shutdown tunnel] [error: %s]", err) - } - } - - case <-quit: - ticker.Stop() - return - } - } - }() -} diff --git a/internal/edge/edge.go b/internal/edge/edge.go new file mode 100644 index 00000000..301bb200 --- /dev/null +++ b/internal/edge/edge.go @@ -0,0 +1,159 @@ +package edge + +import ( + "errors" + "fmt" + "log" + "time" + + "github.com/portainer/agent" + "github.com/portainer/agent/exec" +) + +// Manager is used to manage all Edge features through multiple sub-components. It is mainly responsible for running the Edge background process. +type Manager struct { + clusterService agent.ClusterService + dockerStackService agent.DockerStackService + infoService agent.InfoService + stackManager *StackManager + pollService *PollService + pollServiceConfig *pollServiceConfig + key *edgeKey + edgeMode bool + agentOptions *agent.Options + advertiseAddr string +} + +// NewManager returns a pointer to a new instance of Manager +func NewManager(options *agent.Options, advertiseAddr string, clusterService agent.ClusterService, infoService agent.InfoService) *Manager { + return &Manager{ + clusterService: clusterService, + infoService: infoService, + agentOptions: options, + advertiseAddr: advertiseAddr, + edgeMode: options.EdgeMode, + } +} + +// Start starts the manager +func (manager *Manager) Start() error { + if !manager.IsKeySet() { + return errors.New("Unable to start Edge manager without key") + } + + apiServerAddr := fmt.Sprintf("%s:%s", manager.advertiseAddr, manager.agentOptions.AgentServerPort) + + pollServiceConfig := &pollServiceConfig{ + APIServerAddr: apiServerAddr, + EdgeID: manager.agentOptions.EdgeID, + PollFrequency: agent.DefaultEdgePollInterval, + InactivityTimeout: manager.agentOptions.EdgeInactivityTimeout, + InsecurePoll: manager.agentOptions.EdgeInsecurePoll, + PortainerURL: manager.key.PortainerInstanceURL, + EndpointID: manager.key.EndpointID, + TunnelServerAddr: manager.key.TunnelServerAddr, + TunnelServerFingerprint: manager.key.TunnelServerFingerprint, + } + + log.Printf("[DEBUG] [internal,edge] [api_addr: %s] [edge_id: %s] [poll_frequency: %s] [inactivity_timeout: %s] [insecure_poll: %t]", pollServiceConfig.APIServerAddr, pollServiceConfig.EdgeID, pollServiceConfig.PollFrequency, pollServiceConfig.InactivityTimeout, pollServiceConfig.InsecurePoll) + + dockerStackService, err := exec.NewDockerStackService(agent.DockerBinaryPath) + if err != nil { + return err + } + manager.dockerStackService = dockerStackService + + manager.stackManager = newStackManager(dockerStackService, manager.key.PortainerInstanceURL, manager.key.EndpointID, manager.agentOptions.EdgeID) + + pollService, err := newPollService(manager.stackManager, pollServiceConfig) + if err != nil { + return err + } + manager.pollService = pollService + + err = manager.startEdgeBackgroundProcess() + if err != nil { + return err + } + + return nil +} + +// IsEdgeModeEnabled returns true if edge mode is enabled +func (manager *Manager) IsEdgeModeEnabled() bool { + return manager.edgeMode +} + +// ResetActivityTimer resets the activity timer +func (manager *Manager) ResetActivityTimer() { + manager.pollService.resetActivityTimer() +} + +func (manager *Manager) startEdgeBackgroundProcess() error { + + runtimeCheckFrequency, err := time.ParseDuration(agent.DefaultConfigCheckInterval) + if err != nil { + return err + } + + err = manager.checkRuntimeConfig() + if err != nil { + return err + } + + ticker := time.NewTicker(runtimeCheckFrequency) + + go func() { + for { + select { + case <-ticker.C: + err := manager.checkRuntimeConfig() + if err != nil { + log.Printf("[ERROR] [internal,edge,runtime] [message: an error occured during Docker runtime configuration check] [error: %s]", err) + } + } + } + }() + + return nil +} + +func (manager *Manager) checkRuntimeConfig() error { + agentTags, err := manager.infoService.GetInformationFromDockerEngine() + if err != nil { + return err + } + + agentRunsOnLeaderNode := agentTags.Leader + agentRunsOnSwarm := agentTags.EngineStatus == agent.EngineStatusSwarm + + log.Printf("[DEBUG] [internal,edge,docker] [message: Docker runtime configuration check] [engine_status: %s] [leader_node: %t]", agentTags.EngineStatus, agentRunsOnLeaderNode) + + if !agentRunsOnSwarm || agentRunsOnLeaderNode { + err = manager.pollService.start() + if err != nil { + return err + } + + } else { + err = manager.pollService.stop() + if err != nil { + return err + } + } + + if agentRunsOnSwarm && agentRunsOnLeaderNode { + err = manager.stackManager.start() + if err != nil { + return err + } + + } else { + err = manager.stackManager.stop() + if err != nil { + return err + } + } + + return nil +} diff --git a/internal/edge/key.go b/internal/edge/key.go new file mode 100644 index 00000000..14bed01e --- /dev/null +++ b/internal/edge/key.go @@ -0,0 +1,124 @@ +package edge + +import ( + "encoding/base64" + "errors" + "fmt" + "strings" + + "github.com/portainer/agent" + "github.com/portainer/agent/filesystem" + "github.com/portainer/agent/http/client" +) + +type edgeKey struct { + PortainerInstanceURL string + TunnelServerAddr string + TunnelServerFingerprint string + EndpointID string +} + +// SetKey parses and associates an Edge key to the agent. +// If the agent is running inside a Swarm cluster, it will also set the "set" flag to specify that a key is set on this agent in the cluster. +func (manager *Manager) SetKey(key string) error { + edgeKey, err := parseEdgeKey(key) + if err != nil { + return err + } + + err = filesystem.WriteFile(agent.DataDirectory, agent.EdgeKeyFile, []byte(key), 0444) + if err != nil { + return err + } + + manager.key = edgeKey + + if manager.clusterService != nil { + tags := manager.clusterService.GetTags() + tags.EdgeKeySet = true + err = manager.clusterService.UpdateTags(tags) + if err != nil { + return err + } + } + + return nil +} + +// GetKey returns the Edge key associated to the agent +func (manager *Manager) GetKey() string { + var encodedKey string + + if manager.key != nil { + encodedKey = encodeKey(manager.key) + } + + return encodedKey +} + +// IsKeySet returns true if an Edge key is associated to the agent +func (manager *Manager) IsKeySet() bool { + if manager.key == nil { + return false + } + return true +} + +// PropagateKeyInCluster propagates the Edge key associated to the agent to all the other agents inside the cluster +func (manager *Manager) PropagateKeyInCluster() error { + if manager.clusterService == nil { + return nil + } + + httpCli := client.NewAPIClient() + tags := manager.clusterService.GetTags() + currentNodeName := tags.NodeName + + members := manager.clusterService.Members() + for _, member := range members { + + if member.NodeName == currentNodeName || member.EdgeKeySet { + continue + } + + memberAddr := fmt.Sprintf("%s:%s", member.IPAddress, member.Port) + + err := httpCli.SetEdgeKey(memberAddr, manager.GetKey()) + if err != nil { + return err + } + } + + return nil +} + +// parseEdgeKey decodes a base64 encoded key and extract the decoded information from the following +// format: ||| +// are expected in the user:password format +func parseEdgeKey(key string) (*edgeKey, error) { + decodedKey, err := base64.RawStdEncoding.DecodeString(key) + if err != nil { + return nil, err + } + + keyInfo := strings.Split(string(decodedKey), "|") + + if len(keyInfo) != 4 { + return nil, errors.New("invalid key format") + } + + edgeKey := &edgeKey{ + PortainerInstanceURL: keyInfo[0], + TunnelServerAddr: keyInfo[1], + TunnelServerFingerprint: keyInfo[2], + EndpointID: keyInfo[3], + } + + return edgeKey, nil +} + +func encodeKey(edgeKey *edgeKey) string { + keyInfo := fmt.Sprintf("%s|%s|%s|%s", edgeKey.PortainerInstanceURL, edgeKey.TunnelServerAddr, edgeKey.TunnelServerFingerprint, edgeKey.EndpointID) + encodedKey := base64.RawStdEncoding.EncodeToString([]byte(keyInfo)) + return encodedKey +} diff --git a/internal/edge/poll.go b/internal/edge/poll.go new file mode 100644 index 00000000..aa275c6f --- /dev/null +++ b/internal/edge/poll.go @@ -0,0 +1,322 @@ +package edge + +import ( + "crypto/tls" + "encoding/base64" + "encoding/json" + "errors" + "fmt" + "log" + "net/http" + "strconv" + "time" + + "github.com/portainer/agent" + "github.com/portainer/agent/chisel" + "github.com/portainer/agent/filesystem" + "github.com/portainer/libcrypto" +) + +const tunnelActivityCheckInterval = 30 * time.Second + +// PollService is used to poll a Portainer instance to retrieve the status associated to the Edge endpoint. +// It is responsible for managing the state of the reverse tunnel (open and closing after inactivity). +// It is also responsible for retrieving the data associated to Edge stacks and schedules. +type PollService struct { + apiServerAddr string + pollIntervalInSeconds float64 + insecurePoll bool + inactivityTimeout time.Duration + edgeID string + httpClient *http.Client + tunnelClient agent.ReverseTunnelClient + scheduleManager agent.Scheduler + lastActivity time.Time + refreshSignal chan struct{} + edgeStackManager *StackManager + portainerURL string + endpointID string + tunnelServerAddr string + tunnelServerFingerprint string +} + +type pollServiceConfig struct { + APIServerAddr string + EdgeID string + InactivityTimeout string + PollFrequency string + InsecurePoll bool + PortainerURL string + EndpointID string + TunnelServerAddr string + TunnelServerFingerprint string +} + +// newPollService returns a pointer to a new instance of PollService +func newPollService(edgeStackManager *StackManager, config *pollServiceConfig) (*PollService, error) { + pollFrequency, err := time.ParseDuration(config.PollFrequency) + if err != nil { + return nil, err + } + + inactivityTimeout, err := time.ParseDuration(config.InactivityTimeout) + if err != nil { + return nil, err + } + + return &PollService{ + apiServerAddr: config.APIServerAddr, + edgeID: config.EdgeID, + pollIntervalInSeconds: pollFrequency.Seconds(), + insecurePoll: config.InsecurePoll, + inactivityTimeout: inactivityTimeout, + tunnelClient: chisel.NewClient(), + scheduleManager: filesystem.NewCronManager(), + refreshSignal: nil, + edgeStackManager: edgeStackManager, + portainerURL: config.PortainerURL, + endpointID: config.EndpointID, + tunnelServerAddr: config.TunnelServerAddr, + tunnelServerFingerprint: config.TunnelServerFingerprint, + }, nil +} + +func (service *PollService) closeTunnel() error { + return service.tunnelClient.CloseTunnel() +} + +func (service *PollService) resetActivityTimer() { + if service.tunnelClient.IsTunnelOpen() { + service.lastActivity = time.Now() + } +} + +// start will start two loops in go routines +// The first loop will poll the Portainer instance for the status of the associated endpoint and create a reverse tunnel +// if needed as well as manage schedules. +// The second loop will check for the last activity of the reverse tunnel and close the tunnel if it exceeds the tunnel +// inactivity duration. +func (service *PollService) start() error { + if service.refreshSignal != nil { + return nil + } + + service.refreshSignal = make(chan struct{}) + service.startStatusPollLoop() + service.startActivityMonitoringLoop() + + return nil +} + +func (service *PollService) stop() error { + if service.refreshSignal != nil { + close(service.refreshSignal) + service.refreshSignal = nil + } + return nil +} + +func (service *PollService) restartStatusPollLoop() { + service.stop() + service.startStatusPollLoop() +} + +func (service *PollService) startStatusPollLoop() error { + log.Printf("[DEBUG] [internal,edge,poll] [poll_interval_seconds: %f] [server_url: %s] [message: starting Portainer short-polling client]", service.pollIntervalInSeconds, service.portainerURL) + + ticker := time.NewTicker(time.Duration(service.pollIntervalInSeconds) * time.Second) + go func() { + for { + select { + case <-ticker.C: + err := service.poll() + if err != nil { + log.Printf("[ERROR] [internal,edge,poll] [message: an error occured during short poll] [error: %s]", err) + } + + case <-service.refreshSignal: + log.Println("[DEBUG] [internal,edge,poll] [message: shutting down Portainer short-polling client]") + ticker.Stop() + return + } + } + }() + + return nil +} + +func (service *PollService) startActivityMonitoringLoop() { + ticker := time.NewTicker(tunnelActivityCheckInterval) + quit := make(chan struct{}) + + log.Printf("[DEBUG] [internal,edge,monitoring] [monitoring_interval_seconds: %f] [inactivity_timeout: %s] [message: starting activity monitoring loop]", tunnelActivityCheckInterval.Seconds(), service.inactivityTimeout.String()) + + go func() { + for { + select { + case <-ticker.C: + + if service.lastActivity.IsZero() { + continue + } + + elapsed := time.Since(service.lastActivity) + log.Printf("[DEBUG] [internal,edge,monitoring] [tunnel_last_activity_seconds: %f] [message: tunnel activity monitoring]", elapsed.Seconds()) + + if service.tunnelClient.IsTunnelOpen() && elapsed.Seconds() > service.inactivityTimeout.Seconds() { + + log.Printf("[INFO] [internal,edge,monitoring] [tunnel_last_activity_seconds: %f] [message: shutting down tunnel after inactivity period]", elapsed.Seconds()) + + err := service.tunnelClient.CloseTunnel() + if err != nil { + log.Printf("[ERROR] [internal,edge,monitoring] [message: unable to shutdown tunnel] [error: %s]", err) + } + } + + case <-quit: + ticker.Stop() + return + } + } + }() +} + +const clientDefaultPollTimeout = 5 + +type stackStatus struct { + ID int + Version int +} + +type pollStatusResponse struct { + Status string `json:"status"` + Port int `json:"port"` + Schedules []agent.Schedule `json:"schedules"` + CheckinInterval float64 `json:"checkin"` + Credentials string `json:"credentials"` + Stacks []stackStatus `json:"stacks"` +} + +func (service *PollService) createHTTPClient(timeout float64) { + httpCli := &http.Client{ + Timeout: time.Duration(timeout) * time.Second, + } + + if service.insecurePoll { + httpCli.Transport = &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + }, + } + } + + service.httpClient = httpCli +} + +func (service *PollService) poll() error { + + pollURL := fmt.Sprintf("%s/api/endpoints/%s/status", service.portainerURL, service.endpointID) + req, err := http.NewRequest("GET", pollURL, nil) + if err != nil { + return err + } + + req.Header.Set(agent.HTTPEdgeIdentifierHeaderName, service.edgeID) + + if service.httpClient == nil { + service.createHTTPClient(clientDefaultPollTimeout) + } + + resp, err := service.httpClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + log.Printf("[DEBUG] [internal,edge,poll] [response_code: %d] [message: Poll request failure]", resp.StatusCode) + return errors.New("short poll request failed") + } + + var responseData pollStatusResponse + err = json.NewDecoder(resp.Body).Decode(&responseData) + if err != nil { + return err + } + + log.Printf("[DEBUG] [internal,edge,poll] [status: %s] [port: %d] [schedule_count: %d] [checkin_interval_seconds: %f]", responseData.Status, responseData.Port, len(responseData.Schedules), responseData.CheckinInterval) + + if responseData.Status == "IDLE" && service.tunnelClient.IsTunnelOpen() { + log.Printf("[DEBUG] [internal,edge,poll] [status: %s] [message: Idle status detected, shutting down tunnel]", responseData.Status) + + err := service.tunnelClient.CloseTunnel() + if err != nil { + log.Printf("[ERROR] [internal,edge,poll] [message: Unable to shutdown tunnel] [error: %s]", err) + } + } + + if responseData.Status == "REQUIRED" && !service.tunnelClient.IsTunnelOpen() { + log.Println("[DEBUG] [internal,edge,poll] [message: Required status detected, creating reverse tunnel]") + + err := service.createTunnel(responseData.Credentials, responseData.Port) + if err != nil { + log.Printf("[ERROR] [internal,edge,poll] [message: Unable to create tunnel] [error: %s]", err) + return err + } + } + + err = service.scheduleManager.Schedule(responseData.Schedules) + if err != nil { + log.Printf("[ERROR] [internal,edge,cron] [message: an error occured during schedule management] [err: %s]", err) + } + + if responseData.CheckinInterval != service.pollIntervalInSeconds { + log.Printf("[DEBUG] [internal,edge,poll] [old_interval: %f] [new_interval: %f] [message: updating poll interval]", service.pollIntervalInSeconds, responseData.CheckinInterval) + service.pollIntervalInSeconds = responseData.CheckinInterval + service.createHTTPClient(responseData.CheckinInterval) + go service.restartStatusPollLoop() + } + + if responseData.Stacks != nil { + stacks := map[int]int{} + for _, stack := range responseData.Stacks { + stacks[stack.ID] = stack.Version + } + + err := service.edgeStackManager.updateStacksStatus(stacks) + if err != nil { + log.Printf("[ERROR] [internal,edge,stack] [message: an error occured during stack management] [error: %s]", err) + return err + } + } + + return nil +} + +func (service *PollService) createTunnel(encodedCredentials string, remotePort int) error { + decodedCredentials, err := base64.RawStdEncoding.DecodeString(encodedCredentials) + if err != nil { + return err + } + + credentials, err := libcrypto.Decrypt(decodedCredentials, []byte(service.edgeID)) + if err != nil { + return err + } + + tunnelConfig := agent.TunnelConfig{ + ServerAddr: service.tunnelServerAddr, + ServerFingerpint: service.tunnelServerFingerprint, + Credentials: string(credentials), + RemotePort: strconv.Itoa(remotePort), + LocalAddr: service.apiServerAddr, + } + + err = service.tunnelClient.CreateTunnel(tunnelConfig) + if err != nil { + return err + } + + service.resetActivityTimer() + return nil +} diff --git a/internal/edge/stack.go b/internal/edge/stack.go new file mode 100644 index 00000000..3b9b4765 --- /dev/null +++ b/internal/edge/stack.go @@ -0,0 +1,244 @@ +package edge + +import ( + "fmt" + "log" + "time" + + "github.com/portainer/agent" + "github.com/portainer/agent/filesystem" + "github.com/portainer/agent/http/client" +) + +type edgeStackID int + +type edgeStack struct { + ID edgeStackID + Name string + Version int + FileFolder string + FileName string + Prune bool + Status edgeStackStatus + Action edgeStackAction +} + +type edgeStackStatus int + +const ( + _ edgeStackStatus = iota + statusPending + statusDone + statusError +) + +type edgeStackAction int + +const ( + _ edgeStackAction = iota + actionDeploy + actionUpdate + actionDelete + actionIdle +) + +type edgeStackStatusType int + +const ( + _ edgeStackStatusType = iota + edgeStackStatusOk + edgeStackStatusError + edgeStackStatusAcknowledged +) + +// StackManager represents a service for managing Edge stacks +type StackManager struct { + stacks map[edgeStackID]*edgeStack + stopSignal chan struct{} + dockerStackService agent.DockerStackService + portainerURL string + endpointID string + isEnabled bool + httpClient *client.PortainerClient +} + +// newStackManager returns a pointer to a new instance of StackManager +func newStackManager(dockerStackService agent.DockerStackService, portainerURL, endpointID, edgeID string) *StackManager { + cli := client.NewPortainerClient(portainerURL, endpointID, edgeID) + + return &StackManager{ + dockerStackService: dockerStackService, + stacks: map[edgeStackID]*edgeStack{}, + stopSignal: nil, + httpClient: cli, + } +} + +func (manager *StackManager) updateStacksStatus(stacks map[int]int) error { + if !manager.isEnabled { + return nil + } + + for stackID, version := range stacks { + stack, ok := manager.stacks[edgeStackID(stackID)] + if ok { + if stack.Version == version { + continue + } + log.Printf("[DEBUG] [internal,edge,stack] [stack_identifier: %d] [message: marking stack for update]", stackID) + + stack.Action = actionUpdate + stack.Version = version + stack.Status = statusPending + } else { + log.Printf("[DEBUG] [internal,edge,stack] [stack_identifier: %d] [message: marking stack for deployment]", stackID) + + stack = &edgeStack{ + Action: actionDeploy, + ID: edgeStackID(stackID), + Status: statusPending, + Version: version, + } + } + + stackConfig, err := manager.httpClient.GetEdgeStackConfig(int(stack.ID)) + if err != nil { + return err + } + + stack.Prune = stackConfig.Prune + stack.Name = stackConfig.Name + + folder := fmt.Sprintf("%s/%d", agent.EdgeStackFilesPath, stackID) + fileName := "docker-compose.yml" + err = filesystem.WriteFile(folder, fileName, []byte(stackConfig.FileContent), 644) + if err != nil { + return err + } + + stack.FileFolder = folder + stack.FileName = fileName + + manager.stacks[stack.ID] = stack + + err = manager.httpClient.SetEdgeStackStatus(int(stack.ID), int(edgeStackStatusAcknowledged), "") + if err != nil { + return err + } + } + + for stackID, stack := range manager.stacks { + if _, ok := stacks[int(stackID)]; !ok { + log.Printf("[DEBUG] [internal,edge,stack] [stack_identifier: %d] [message: marking stack for deletion]", stackID) + stack.Action = actionDelete + stack.Status = statusPending + + manager.stacks[stackID] = stack + } + } + + return nil +} + +func (manager *StackManager) stop() error { + if manager.stopSignal != nil { + close(manager.stopSignal) + manager.stopSignal = nil + manager.isEnabled = false + } + + return nil +} + +func (manager *StackManager) start() error { + if manager.stopSignal != nil { + return nil + } + + manager.isEnabled = true + manager.stopSignal = make(chan struct{}) + + queueSleepInterval, err := time.ParseDuration(agent.EdgeStackQueueSleepInterval) + if err != nil { + return err + } + + go (func() { + for { + select { + case <-manager.stopSignal: + log.Println("[DEBUG] [internal,edge,stack] [message: shutting down Edge stack manager]") + return + default: + stack := manager.next() + if stack == nil { + timer1 := time.NewTimer(queueSleepInterval) + <-timer1.C + continue + } + + stackName := fmt.Sprintf("edge_%s", stack.Name) + stackFileLocation := fmt.Sprintf("%s/%s", stack.FileFolder, stack.FileName) + + if stack.Action == actionDeploy || stack.Action == actionUpdate { + manager.deployStack(stack, stackName, stackFileLocation) + } else if stack.Action == actionDelete { + manager.deleteStack(stack, stackName, stackFileLocation) + } + } + } + })() + + return nil +} + +func (manager *StackManager) next() *edgeStack { + for _, stack := range manager.stacks { + if stack.Status == statusPending { + return stack + } + } + return nil +} + +func (manager *StackManager) deployStack(stack *edgeStack, stackName, stackFileLocation string) { + log.Printf("[DEBUG] [internal,edge,stack] [stack_identifier: %d] [message: stack deployment]", stack.ID) + stack.Status = statusDone + stack.Action = actionIdle + responseStatus := int(edgeStackStatusOk) + errorMessage := "" + + err := manager.dockerStackService.Deploy(stackName, stackFileLocation, stack.Prune) + if err != nil { + log.Printf("[ERROR] [internal,edge,stack] [message: stack deployment failed] [error: %s]", err) + stack.Status = statusError + responseStatus = int(edgeStackStatusError) + errorMessage = err.Error() + } else { + log.Printf("[DEBUG] [internal,edge,stack] [stack_identifier: %d] [stack_version: %d] [message: stack deployed]", stack.ID, stack.Version) + } + + manager.stacks[stack.ID] = stack + + err = manager.httpClient.SetEdgeStackStatus(int(stack.ID), responseStatus, errorMessage) + if err != nil { + log.Printf("[ERROR] [internal,edge,stack] [message: unable to update Edge stack status] [error: %s]", err) + } +} + +func (manager *StackManager) deleteStack(stack *edgeStack, stackName, stackFileLocation string) { + log.Printf("[DEBUG] [internal,edge,stack] [stack_identifier: %d] [message: removing stack]", stack.ID) + err := filesystem.RemoveFile(stackFileLocation) + if err != nil { + log.Printf("[ERROR] [internal,edge,stack] [message: unable to delete Edge stack file] [error: %s]", err) + return + } + + err = manager.dockerStackService.Remove(stackName) + if err != nil { + log.Printf("[ERROR] [internal,edge,stack] [message: unable to remove stack] [error: %s]", err) + return + } + + delete(manager.stacks, stack.ID) +} diff --git a/os/options.go b/os/options.go index e362e91f..124b1a1a 100644 --- a/os/options.go +++ b/os/options.go @@ -23,6 +23,7 @@ const ( EnvKeyEdgeInactivityTimeout = "EDGE_INACTIVITY_TIMEOUT" EnvKeyEdgeInsecurePoll = "EDGE_INSECURE_POLL" EnvKeyLogLevel = "LOG_LEVEL" + EnvKeyDockerBinaryPath = "DOCKER_BINARY_PATH" ) type EnvOptionParser struct{} diff --git a/serf/cluster.go b/serf/cluster.go index 3421cc25..d0e78126 100644 --- a/serf/cluster.go +++ b/serf/cluster.go @@ -11,15 +11,29 @@ import ( "github.com/portainer/agent" ) +const ( + memberTagKeyAgentPort = "AgentPort" + memberTagKeyIsLeader = "NodeIsLeader" + memberTagKeyNodeName = "NodeName" + memberTagKeyNodeRole = "NodeRole" + memberTagKeyEngineStatus = "EngineStatus" + memberTagKeyEdgeKeySet = "EdgeKeySet" + + memberTagValueEngineStatusSwarm = "swarm" + memberTagValueEngineStatusStandalone = "standalone" + memberTagValueNodeRoleManager = "manager" + memberTagValueNodeRoleWorker = "worker" +) + // ClusterService is a service used to manage cluster related actions such as joining // the cluster, retrieving members in the clusters... type ClusterService struct { - tags map[string]string + tags *agent.InfoTags cluster *serf.Serf } // NewClusterService returns a pointer to a ClusterService. -func NewClusterService(tags map[string]string) *ClusterService { +func NewClusterService(tags *agent.InfoTags) *ClusterService { return &ClusterService{ tags: tags, } @@ -42,8 +56,8 @@ func (service *ClusterService) Create(advertiseAddr string, joinAddr []string) e conf := serf.DefaultConfig() conf.Init() - conf.NodeName = fmt.Sprintf("%s-%s", service.tags[agent.MemberTagKeyNodeName], conf.NodeName) - conf.Tags = service.tags + conf.NodeName = fmt.Sprintf("%s-%s", service.tags.NodeName, conf.NodeName) + conf.Tags = convertTagsToMap(service.tags) conf.MemberlistConfig.LogOutput = filter conf.LogOutput = filter conf.MemberlistConfig.AdvertiseAddr = advertiseAddr @@ -80,13 +94,13 @@ func (service *ClusterService) Members() []agent.ClusterMember { if member.Status == serf.StatusAlive { clusterMember := agent.ClusterMember{ IPAddress: member.Addr.String(), - Port: member.Tags[agent.MemberTagKeyAgentPort], - NodeRole: member.Tags[agent.MemberTagKeyNodeRole], - NodeName: member.Tags[agent.MemberTagKeyNodeName], + Port: member.Tags[memberTagKeyAgentPort], + NodeRole: member.Tags[memberTagKeyNodeRole], + NodeName: member.Tags[memberTagKeyNodeName], EdgeKeySet: false, } - _, ok := member.Tags[agent.MemberTagEdgeKeySet] + _, ok := member.Tags[memberTagKeyEdgeKeySet] if ok { clusterMember.EdgeKeySet = true } @@ -99,10 +113,16 @@ func (service *ClusterService) Members() []agent.ClusterMember { } // GetMemberByRole will return the first member with the specified role. -func (service *ClusterService) GetMemberByRole(role string) *agent.ClusterMember { +func (service *ClusterService) GetMemberByRole(role agent.NodeRole) *agent.ClusterMember { members := service.Members() + + roleString := memberTagValueNodeRoleManager + if role == agent.NodeRoleWorker { + roleString = memberTagValueNodeRoleWorker + } + for _, member := range members { - if member.NodeRole == role { + if member.NodeRole == roleString { return &member } } @@ -135,12 +155,41 @@ func (service *ClusterService) GetMemberWithEdgeKeySet() *agent.ClusterMember { } // UpdateTags propagate the new tags to the cluster -func (service *ClusterService) UpdateTags(tags map[string]string) error { +func (service *ClusterService) UpdateTags(tags *agent.InfoTags) error { service.tags = tags - return service.cluster.SetTags(tags) + tagsMap := convertTagsToMap(tags) + return service.cluster.SetTags(tagsMap) } // GetTags returns the tags associated to the service -func (service *ClusterService) GetTags() map[string]string { +func (service *ClusterService) GetTags() *agent.InfoTags { return service.tags } + +func convertTagsToMap(tags *agent.InfoTags) map[string]string { + tagsMap := map[string]string{} + + if tags.EdgeKeySet { + tagsMap[memberTagKeyEdgeKeySet] = "set" + } + + tagsMap[memberTagKeyEngineStatus] = memberTagValueEngineStatusStandalone + if tags.EngineStatus == agent.EngineStatusSwarm { + tagsMap[memberTagKeyEngineStatus] = memberTagValueEngineStatusSwarm + } + + tagsMap[memberTagKeyAgentPort] = tags.AgentPort + + if tags.Leader { + tagsMap[memberTagKeyIsLeader] = "1" + } + + tagsMap[memberTagKeyNodeName] = tags.NodeName + + tagsMap[memberTagKeyNodeRole] = memberTagValueNodeRoleManager + if tags.NodeRole == agent.NodeRoleWorker { + tagsMap[memberTagKeyNodeRole] = memberTagValueNodeRoleWorker + } + + return tagsMap +} diff --git a/setup.sh b/setup.sh new file mode 100755 index 00000000..733875f7 --- /dev/null +++ b/setup.sh @@ -0,0 +1,26 @@ +#!/usr/bin/env bash + +PLATFORM=$1 +ARCH=$2 +DOCKER_VERSION=$3 + +PLATFORM="linux" +ARCH="x86_64" +DOCKER_VERSION="18.09.3" + +DOWNLOAD_FOLDER=".tmp/download" + +rm -rf "${DOWNLOAD_FOLDER}" +mkdir -pv "${DOWNLOAD_FOLDER}" + +if [ "${PLATFORM}" == 'win' ]; then + wget -O "${DOWNLOAD_FOLDER}/docker-binaries.zip" "https://download.docker.com/${PLATFORM}/static/stable/${ARCH}/docker-${DOCKER_VERSION}.zip" + unzip "${DOWNLOAD_FOLDER}/docker-binaries.zip" -d "${DOWNLOAD_FOLDER}" + mv "${DOWNLOAD_FOLDER}/docker/docker.exe" dist/ +else + wget -O "${DOWNLOAD_FOLDER}/docker-binaries.tgz" "https://download.docker.com/${PLATFORM}/static/stable/${ARCH}/docker-${DOCKER_VERSION}.tgz" + tar -xf "${DOWNLOAD_FOLDER}/docker-binaries.tgz" -C "${DOWNLOAD_FOLDER}" + mv "${DOWNLOAD_FOLDER}/docker/docker" dist/ +fi + +exit 0