Skip to content

Commit

Permalink
Proxy for datanode rest, jwt auth filter for datanode (#17827)
Browse files Browse the repository at this point in the history
* Proxy for datanode rest, jwt auth filter for datanode

* fixed test

* fixed test

* Fixed passing request body via datanode proxy

* disable entity capturing for datanode rest proxy calls

* datanode rest proxy refactored to okHttp client

* code cleanup, jwt token and auth filter separation

* Removed unused import

* code cleanup, fixed request body issue
  • Loading branch information
todvora authored Jan 9, 2024
1 parent 44aad48 commit b0afd62
Show file tree
Hide file tree
Showing 18 changed files with 617 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.datanode.initializers;

public interface AuthTokenValidator {
void verifyToken(String token) throws TokenVerificationException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.datanode.initializers;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.container.ContainerRequestFilter;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.util.Collections;
import java.util.Optional;

/**
* This is an authorization filter that first try to verify presence and validity of a bearer token. If there is no
* bearer token available, it will fallback to basic auth (or whatever filter is configured as fallback).
* Allowing both auth methods allows easy access directly from CLI or browser and machine-machine communication from the graylog server.
*/
public class DatanodeAuthFilter implements ContainerRequestFilter {

private static final Logger LOG = LoggerFactory.getLogger(DatanodeAuthFilter.class);
private static final String AUTHENTICATION_SCHEME = "Bearer";
private final ContainerRequestFilter fallbackFilter;
private final AuthTokenValidator tokenVerifier;


public DatanodeAuthFilter(ContainerRequestFilter fallbackFilter, AuthTokenValidator tokenVerifier) {
this.fallbackFilter = fallbackFilter;
this.tokenVerifier = tokenVerifier;
}

private Optional<String> getBearerHeader(ContainerRequestContext requestContext) {
final MultivaluedMap<String, String> headers = requestContext.getHeaders();
return headers.getOrDefault(HttpHeaders.AUTHORIZATION, Collections.emptyList())
.stream()
.filter(a -> a.startsWith(AUTHENTICATION_SCHEME))
.findFirst();
}

@Override
public void filter(ContainerRequestContext requestContext) throws IOException {
final Optional<String> header = getBearerHeader(requestContext);
if (header.isEmpty()) {
// no JWT token, we'll fallback to basic auth
fallbackFilter.filter(requestContext);
} else {
final String token = header.map(h -> h.replaceFirst(AUTHENTICATION_SCHEME + " ", "")).get();
try {
tokenVerifier.verifyToken(token);
} catch (TokenVerificationException e) {
LOG.error("Failed to verify auth token", e);
abortRequest(requestContext);
}
}
}


private void abortRequest(ContainerRequestContext requestContext) {
requestContext.abortWith(Response.status(Response.Status.UNAUTHORIZED)
.entity("Failed to parse auth header")
.type(MediaType.TEXT_PLAIN_TYPE)
.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,14 @@
import org.graylog2.shared.rest.exceptionmappers.JacksonPropertyExceptionMapper;
import org.graylog2.shared.rest.exceptionmappers.JsonProcessingExceptionMapper;
import org.graylog2.shared.security.tls.KeyStoreUtils;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import javax.inject.Named;
import javax.net.ssl.SSLContext;
import javax.ws.rs.container.ContainerRequestFilter;
import javax.ws.rs.container.DynamicFeature;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.ext.ContextResolver;
Expand Down Expand Up @@ -229,8 +231,9 @@ private HttpServer setUp(URI listenUri,
final boolean isSecuredInstance = sslEngineConfigurator != null;
final ResourceConfig resourceConfig = buildResourceConfig(additionalResources);

if(isSecuredInstance) {
resourceConfig.register(new BasicAuthFilter(configuration.getRootUsername(), configuration.getRootPasswordSha2(), "Datanode"));
if (isSecuredInstance) {
resourceConfig.register(createAuthFilter(configuration));

}
resourceConfig.register(new SecuredNodeAnnotationFilter(configuration.isInsecureStartup()));

Expand Down Expand Up @@ -264,6 +267,13 @@ private HttpServer setUp(URI listenUri,
return httpServer;
}

@NotNull
private ContainerRequestFilter createAuthFilter(Configuration configuration) {
final ContainerRequestFilter basicAuthFilter = new BasicAuthFilter(configuration.getRootUsername(), configuration.getRootPasswordSha2(), "Datanode");
final AuthTokenValidator tokenVerifier = new JwtTokenValidator(configuration.getPasswordSecret());
return new DatanodeAuthFilter(basicAuthFilter, tokenVerifier);
}

private SSLEngineConfigurator buildSslEngineConfigurator(KeystoreInformation keystoreInformation)
throws GeneralSecurityException, IOException {
if (keystoreInformation == null || !Files.isRegularFile(keystoreInformation.location()) || !Files.isReadable(keystoreInformation.location())) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.datanode.initializers;

import io.jsonwebtoken.Jwt;
import io.jsonwebtoken.JwtParser;
import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.SignatureAlgorithm;

import javax.crypto.spec.SecretKeySpec;
import javax.inject.Named;
import javax.inject.Singleton;
import java.nio.charset.StandardCharsets;
import java.security.Key;
import java.util.Optional;

@Singleton
public class JwtTokenValidator implements AuthTokenValidator {

public static final String REQUIRED_SUBJECT = "admin";
public static final String REQUIRED_ISSUER = "graylog";
private final String signingKey;

public JwtTokenValidator(@Named("password_secret") final String signingKey) {
this.signingKey = signingKey;
}

@Override
public void verifyToken(String token) throws TokenVerificationException {
SignatureAlgorithm signatureAlgorithm = SignatureAlgorithm.HS256;
Key signingKey = new SecretKeySpec(this.signingKey.getBytes(StandardCharsets.UTF_8), signatureAlgorithm.getJcaName());
final JwtParser parser = Jwts.parserBuilder()
.setSigningKey(signingKey)
.requireSubject(REQUIRED_SUBJECT)
.requireIssuer(REQUIRED_ISSUER)
.build();
try {
final Jwt parsed = parser.parse(token);
verifySignature(parsed, signatureAlgorithm);
} catch (Exception e) {
throw new TokenVerificationException(e);
}
}

private void verifySignature(Jwt token, SignatureAlgorithm expectedAlgorithm) {
final SignatureAlgorithm usedAlgorithm = Optional.of(token.getHeader())
.map(h -> h.get("alg"))
.map(Object::toString)
.map(SignatureAlgorithm::forName)
.orElseThrow(() -> new IllegalArgumentException("Token doesn't provide valid signature algorithm"));

if (expectedAlgorithm != usedAlgorithm) {
throw new IllegalArgumentException("Token is using unsupported signature algorithm :" + usedAlgorithm);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.datanode.initializers;

public class TokenVerificationException extends Exception {
public TokenVerificationException(Exception cause) {
super(cause);
}

public TokenVerificationException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public interface OpensearchProcess extends ManagableProcess<OpensearchConfigurat

URI getOpensearchBaseUrl();
String getOpensearchClusterUrl();
String getDatanodeRestApiUrl();

void onRemove();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
Expand Down Expand Up @@ -124,6 +125,15 @@ public String getOpensearchClusterUrl() {
return configuration.getDatanodeNodeName() + ":" + configuration.getOpensearchTransportPort();
}

@Override
public String getDatanodeRestApiUrl() {
final boolean secured = opensearchConfiguration.map(OpensearchConfiguration::securityConfigured).orElse(false);
String protocol = secured ? "https" : "http";
String host = configuration.getHostname();
final int port = configuration.getDatanodeHttpPort();
return String.format(Locale.ROOT, "%s://%s:%d", protocol, host, port);
}

public void onEvent(ProcessEvent event) {
LOG.debug("Process event: " + event);
this.processState.fire(event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,24 @@ public class NodePingPeriodical extends Periodical {
private final NodeId nodeId;
private final Supplier<URI> opensearchBaseUri;
private final Supplier<String> opensearchClusterUri;
private final Supplier<String> datanodeRestApiUri;
private final Supplier<Boolean> isLeader;
private final Configuration configuration;
private final Supplier<ProcessState> processState;


@Inject
public NodePingPeriodical(NodeService<DataNodeDto> nodeService, NodeId nodeId, Configuration configuration, OpensearchProcess managedOpenSearch) {
this(nodeService, nodeId, configuration, managedOpenSearch::getOpensearchBaseUrl, managedOpenSearch::getOpensearchClusterUrl, managedOpenSearch::isLeaderNode, () -> managedOpenSearch.processInfo().state());
this(
nodeService,
nodeId,
configuration,
managedOpenSearch::getOpensearchBaseUrl,
managedOpenSearch::getOpensearchClusterUrl,
managedOpenSearch::getDatanodeRestApiUrl,
managedOpenSearch::isLeaderNode,
() -> managedOpenSearch.processInfo().state()
);
}

NodePingPeriodical(
Expand All @@ -55,13 +65,15 @@ public NodePingPeriodical(NodeService<DataNodeDto> nodeService, NodeId nodeId, C
Configuration configuration,
Supplier<URI> opensearchBaseUri,
Supplier<String> opensearchClusterUri,
Supplier<String> datanodeRestApiUri,
Supplier<Boolean> isLeader,
Supplier<ProcessState> processState
) {
this.nodeService = nodeService;
this.nodeId = nodeId;
this.opensearchBaseUri = opensearchBaseUri;
this.opensearchClusterUri = opensearchClusterUri;
this.datanodeRestApiUri = datanodeRestApiUri;
this.isLeader = isLeader;
this.configuration = configuration;
this.processState = processState;
Expand Down Expand Up @@ -117,6 +129,7 @@ public void doRun() {
.setClusterAddress(opensearchClusterUri.get())
.setDataNodeStatus(processState.get().getDataNodeStatus())
.setHostname(configuration.getHostname())
.setRestApiAddress(datanodeRestApiUri.get())
.build();

nodeService.ping(dto);
Expand Down
Loading

0 comments on commit b0afd62

Please sign in to comment.