Skip to content

Commit

Permalink
Merge branch 'master' into update-stream-telemetry
Browse files Browse the repository at this point in the history
  • Loading branch information
gally47 authored Oct 1, 2024
2 parents 06b95ce + 5e0e886 commit d1b0565
Show file tree
Hide file tree
Showing 15 changed files with 323 additions and 237 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.codahale.metrics.RatioGauge;
import com.github.joschi.jadconfig.util.Size;
import com.google.common.eventbus.EventBus;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.graylog2.notifications.Notification;
import org.graylog2.notifications.NotificationService;
import org.graylog2.plugin.GlobalMetricNames;
Expand All @@ -33,9 +35,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import jakarta.inject.Inject;
import jakarta.inject.Named;

import static org.graylog2.shared.metrics.MetricUtils.safelyRegister;

/**
Expand Down Expand Up @@ -236,7 +235,7 @@ public void doRun() {
eventBus.post(throttleState);

// Abusing the current thread to send notifications from KafkaJournal in the graylog2-shared module
final double journalUtilizationPercentage = throttleState.journalSizeLimit > 0 ? (throttleState.journalSize * 100) / throttleState.journalSizeLimit : 0.0;
final double journalUtilizationPercentage = journal.getJournalUtilization().orElse(0.0);

if (journalUtilizationPercentage > LocalKafkaJournal.NOTIFY_ON_UTILIZATION_PERCENTAGE) {
Notification notification = notificationService.buildNow()
Expand Down
15 changes: 8 additions & 7 deletions graylog2-server/src/main/java/org/graylog2/plugin/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.codahale.metrics.Meter;
import com.eaio.uuid.UUID;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.CharMatcher;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Objects;
Expand Down Expand Up @@ -64,9 +65,10 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static com.google.common.base.CharMatcher.anyOf;
import static com.google.common.base.CharMatcher.inRange;
import static com.google.common.base.Predicates.equalTo;
import static com.google.common.base.Predicates.not;
import static org.graylog.schema.GraylogSchemaFields.FIELD_ILLUMINATE_EVENT_CATEGORY;
Expand Down Expand Up @@ -216,10 +218,9 @@ public class Message implements Messages, Indexable, Acknowledgeable {
public static final String FIELD_GL2_SOURCE_RADIO_INPUT = "gl2_source_radio_input";

// Matches whole field names containing a-z, A-Z, 0-9, period char, -, or @.
private static final Pattern VALID_KEY_CHARS = Pattern.compile("^[\\w\\.\\-@]*$");
// Same as above, but matches only the invalid (non-indicated) characters.
// [^ ... ] around the pattern inverts the match.
private static final Pattern INVALID_KEY_CHARS = Pattern.compile("[^\\w\\.\\-@]");
private static final CharMatcher VALID_KEY_CHAR_MATCHER = inRange('a', 'z').or(inRange('A', 'Z')).or(inRange('0', '9')).or(anyOf(".@-_")).precomputed();
private static final CharMatcher INVALID_KEY_CHAR_MATCHER = VALID_KEY_CHAR_MATCHER.negate().precomputed();

private static final char KEY_REPLACEMENT_CHAR = '_';

private static final ImmutableSet<String> GRAYLOG_FIELDS = ImmutableSet.of(
Expand Down Expand Up @@ -652,11 +653,11 @@ public long getSize() {
}

public static boolean validKey(final String key) {
return VALID_KEY_CHARS.matcher(key).matches();
return VALID_KEY_CHAR_MATCHER.matchesAllOf(key);
}

public static String cleanKey(final String key) {
return INVALID_KEY_CHARS.matcher(key).replaceAll(String.valueOf(KEY_REPLACEMENT_CHAR));
return INVALID_KEY_CHAR_MATCHER.replaceFrom(key, KEY_REPLACEMENT_CHAR);
}

public void addFields(final Map<String, Object> fields) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.graylog2.shared.journal;

import java.util.List;
import java.util.Optional;

public interface Journal {
Entry createEntry(byte[] idBytes, byte[] messageBytes);
Expand All @@ -31,6 +32,26 @@ public interface Journal {

void flush();

/**
* Returns an {@code Optional} containing the current journal utilization as a percentage of the
* maximum retention size. This default implementation returns an empty {@code Optional},
* indicating that no utilization data is available.
*
* @return an {@code Optional<Double>} representing the journal utilization percentage,
* or an empty {@code Optional} if utilization data is unavailable.
*/
default Optional<Double> getJournalUtilization() {
return Optional.empty();
}

/**
* Executes the retention policy on the journal, deleting outdated or excess data based on the
* configured retention rules.
*
* @return an integer representing the amount of data deleted during the retention process.
*/
int runRetention();

class Entry {
private final byte[] idBytes;
private final byte[] messageBytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Uninterruptibles;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.StringUtils;
import org.graylog.shaded.kafka09.common.KafkaException;
import org.graylog.shaded.kafka09.common.OffsetOutOfRangeException;
Expand Down Expand Up @@ -63,10 +66,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
Expand All @@ -83,6 +82,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -166,6 +166,7 @@ public void sleep(long ms) {
private final LogRetentionCleaner logRetentionCleaner;
private final long maxSegmentSize;
private final int maxMessageSize;
private final long maxRetentionSize;
private final String metricPrefix;

private long nextReadOffset = 0L;
Expand Down Expand Up @@ -225,6 +226,7 @@ public LocalKafkaJournal(Path journalDirectory,
this.maxSegmentSize = segmentSize.toBytes();
// Max message size should not be bigger than max segment size.
this.maxMessageSize = Ints.saturatedCast(maxSegmentSize);
this.maxRetentionSize = retentionSize.toBytes();
this.metricPrefix = metricPrefix;
this.metricRegistry = metricRegistry;

Expand Down Expand Up @@ -795,6 +797,31 @@ public long getNextReadOffset() {
return nextReadOffset;
}

/**
* Returns the current utilization of the journal as a percentage of the maximum retention size.
* This method calculates how much of the maximum retention size is currently being utilized
* based on the size of the Kafka log.
*
* @return the journal utilization as a percentage, or 0.0 if the max retention size is zero.
*/
@Override
public Optional<Double> getJournalUtilization() {
return Optional.of(calculateUtilization(maxRetentionSize, kafkaLog.size()));
}

/**
* Calculates the percentage utilization of the journal.
* This method computes the utilization as a percentage by dividing the Kafka log size by the
* maximum retention size and multiplying by 100.
*
* @param maxRetentionSize the maximum retention size of the journal
* @param kafkaLogSize the current size of the Kafka log
* @return an {@code Optional<Double>} containing the journal utilization as a percentage.
*/
private double calculateUtilization(long maxRetentionSize, long kafkaLogSize) {
return maxRetentionSize > 0 ? (double) (kafkaLogSize * 100) / maxRetentionSize : 0.0;
}

@Override
protected void startUp() throws Exception {
// do NOT let Kafka's LogManager create its management threads, we will run them ourselves.
Expand Down Expand Up @@ -844,7 +871,19 @@ protected void shutDown() throws Exception {
teardownLogMetrics();
}

public int cleanupLogs() {
/**
* Executes the retention policy on the journal by invoking the {@code logRetentionCleaner}.
* This method handles the following tasks:
* <ul>
* <li>Cleans up expired log segments that have exceeded the configured retention time.</li>
* <li>Ensures that segment sizes are maintained within the maximum allowable limits.</li>
* <li>Removes committed log segments that are no longer needed,</li>
* </ul>
*
* @return an integer representing the total amount of data deleted by the {@code logRetentionCleaner}
*/
@Override
public int runRetention() {
try {
return logRetentionCleaner.call();
} catch (Exception e) {
Expand Down Expand Up @@ -975,7 +1014,7 @@ public void run() {
}

@Override
public Integer call() throws Exception {
public synchronized Integer call() throws Exception {
loggerForCleaner.debug("Beginning log cleanup");
int total = 0;
final Timer.Context ctx = new Timer().time();
Expand Down Expand Up @@ -1045,7 +1084,7 @@ private void updateLoadBalancerStatus(double utilizationPercentage) {
private int cleanupSegmentsToMaintainSize(Log kafkaLog) {
final long retentionSize = kafkaLog.config().retentionSize();
final long currentSize = kafkaLog.size();
final double utilizationPercentage = retentionSize > 0 ? (currentSize * 100) / retentionSize : 0.0;
final double utilizationPercentage = calculateUtilization(retentionSize, currentSize);
if (utilizationPercentage > LocalKafkaJournal.NOTIFY_ON_UTILIZATION_PERCENTAGE) {
LOG.warn("Journal utilization ({}%) has gone over {}%.", utilizationPercentage,
LocalKafkaJournal.NOTIFY_ON_UTILIZATION_PERCENTAGE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,10 @@ protected void shutDown() throws Exception {
public void flush() {
// nothing to do
}

@Override
public int runRetention() {
// nothing to do here returning -1
return -1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,8 @@ public void testValidKeys() throws Exception {
assertTrue(Message.validKey("foo@bar"));
assertTrue(Message.validKey("123"));
assertTrue(Message.validKey(""));

assertFalse(Message.validKey(" foo123"));
assertFalse(Message.validKey("foo123 "));
assertFalse(Message.validKey("foo bar"));
assertFalse(Message.validKey("foo+bar"));
assertFalse(Message.validKey("foo$bar"));
Expand All @@ -381,6 +382,7 @@ public void testCleanKey() throws Exception {
assertEquals("foo_bar", Message.cleanKey("foo{bar"));
assertEquals("foo_bar", Message.cleanKey("foo,bar"));
assertEquals("foo_bar", Message.cleanKey("foo?bar"));
assertEquals("foo___bar", Message.cleanKey("foo +?bar"));
assertEquals("_", Message.cleanKey(" "));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ public void segmentSizeCleanup() throws Exception {

assertEquals(3, countSegmentsInDir(messageJournalDir));

final int cleanedLogs = journal.cleanupLogs();
final int cleanedLogs = journal.runRetention();
assertEquals(1, cleanedLogs);

final int numberOfSegments = countSegmentsInDir(messageJournalDir);
Expand Down Expand Up @@ -366,14 +366,14 @@ public void segmentAgeCleanup() throws Exception {
i++;
}

int cleanedLogs = journal.cleanupLogs();
int cleanedLogs = journal.runRetention();
assertEquals("no segments should've been cleaned", 0, cleanedLogs);
assertEquals("two segments segment should remain", 2, countSegmentsInDir(messageJournalDir));

// move clock beyond the retention period and clean again
clock.tick(Period.seconds(120));

cleanedLogs = journal.cleanupLogs();
cleanedLogs = journal.runRetention();
assertEquals("two segments should've been cleaned (only one will actually be removed...)", 2, cleanedLogs);
assertEquals("one segment should remain", 1, countSegmentsInDir(messageJournalDir));

Expand Down Expand Up @@ -407,23 +407,23 @@ public void segmentCommittedCleanup() throws Exception {
assertEquals(3, countSegmentsInDir(messageJournalDir));

// we haven't committed any offsets, this should not touch anything.
final int cleanedLogs = journal.cleanupLogs();
final int cleanedLogs = journal.runRetention();
assertEquals(0, cleanedLogs);

final int numberOfSegments = countSegmentsInDir(messageJournalDir);
assertEquals(3, numberOfSegments);

// mark first half of first segment committed, should not clean anything
journal.markJournalOffsetCommitted(bulkSize / 2);
assertEquals("should not touch segments", 0, journal.cleanupLogs());
assertEquals("should not touch segments", 0, journal.runRetention());
assertEquals(3, countSegmentsInDir(messageJournalDir));

journal.markJournalOffsetCommitted(bulkSize + 1);
assertEquals("first segment should've been purged", 1, journal.cleanupLogs());
assertEquals("first segment should've been purged", 1, journal.runRetention());
assertEquals(2, countSegmentsInDir(messageJournalDir));

journal.markJournalOffsetCommitted(bulkSize * 4);
assertEquals("only purge one segment, not the active one", 1, journal.cleanupLogs());
assertEquals("only purge one segment, not the active one", 1, journal.runRetention());
assertEquals(1, countSegmentsInDir(messageJournalDir));
}

Expand Down Expand Up @@ -477,7 +477,7 @@ public void serverStatusThrottledIfJournalUtilizationIsHigherThanThreshold() thr

createBulkChunks(journal, segmentSize, 4);
journal.flushDirtyLogs();
journal.cleanupLogs();
journal.runRetention();
assertThat(serverStatus.getLifecycle()).isEqualTo(Lifecycle.THROTTLED);
}

Expand All @@ -499,7 +499,7 @@ public void serverStatusUnthrottledIfJournalUtilizationIsLowerThanThreshold() th
serverStatus);

journal.flushDirtyLogs();
journal.cleanupLogs();
journal.runRetention();
assertThat(serverStatus.getLifecycle()).isEqualTo(Lifecycle.RUNNING);
}

Expand Down Expand Up @@ -577,4 +577,35 @@ public void readNext() throws Exception {
}
);
}

@Test
public void testJournalUtilizationSize() {
Size retentionSize = Size.kilobytes(10L);
Size segmentSize = Size.kilobytes(1l);
final LocalKafkaJournal journal = new LocalKafkaJournal(journalDirectory.toPath(),
scheduler,
segmentSize,
Duration.standardHours(1),
retentionSize,
Duration.standardDays(1),
1_000_000,
Duration.standardMinutes(1),
LocalKafkaJournal.THRESHOLD_THROTTLING_DISABLED,
new MetricRegistry(),
serverStatus);

//No message in the journal utilization should be zero
assertThat(journal.getJournalUtilization().get()).isZero();

//After writing messages utilization should be positive
final int bulkSize = createBulkChunks(journal, segmentSize, 3);
double utilizationAfterBulk = journal.getJournalUtilization().get();
assertThat(utilizationAfterBulk).isPositive();

//Utilization should decrease again when clean up kicked in
journal.markJournalOffsetCommitted(bulkSize);
final int cleanedLogs = journal.runRetention();
assertEquals(1, cleanedLogs);
assertThat(journal.getJournalUtilization().get()).isLessThan(utilizationAfterBulk);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
"dependencies": {
"@babel/eslint-parser": "7.16.5",
"@tanstack/eslint-plugin-query": "4.36.1",
"@typescript-eslint/eslint-plugin": "8.7.0",
"@typescript-eslint/parser": "8.7.0",
"@typescript-eslint/eslint-plugin": "8.8.0",
"@typescript-eslint/parser": "8.8.0",
"eslint": "8.57.0",
"eslint-config-airbnb": "19.0.4",
"eslint-import-resolver-webpack": "0.13.9",
Expand Down
2 changes: 1 addition & 1 deletion graylog2-web-interface/src/components/common/Section.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ const Section = ({ title, actions, headerLeftSection, collapsible, defaultClosed
<Header $opened={opened} $collapsible={collapsible} onClick={onHeaderClick}>
<FlexWrapper>
<h2>{title}</h2>
{headerLeftSection && <FlexWrapper>{headerLeftSection}</FlexWrapper>}
{headerLeftSection && <FlexWrapper onClick={(e) => { e.stopPropagation(); }}>{headerLeftSection}</FlexWrapper>}
</FlexWrapper>
<FlexWrapper>
{actions && <div>{actions}</div>}
Expand Down
Loading

0 comments on commit d1b0565

Please sign in to comment.