Skip to content

Commit

Permalink
High availability support (#184)
Browse files Browse the repository at this point in the history
* update gitignore

* update gitignore

* uploading log to shared state

* checking time difference and active status

* splitting uploading to different method

* moved to BaseExtractor

* moved method to scheduler

* simplified some logic

* some cleanup

* switched to UtcNow

* refactoring and cancelling if multiple active extractors

* clean-up

* added ExtractorState

* some refactoring

* refactor

* some name changes and refactor

* made ExtractorState class more general

* refactor

* added test for extractors

* updated tests

* test update

* updated tests with comments

* some error handling and refactor

* added config class

* refactor

* refactored UpdateStateAtInterval

* refactor

* more refactoring

* changed file structure

* moved some functions

* fix

* fix

* namechanges

* reverted Program.cs

* created test class

* added new test file

* name change

* moved config to config file

* fixes

* started on unit test

* rm spaces

* space

* format

* created UploadLogToState test and MyConfig class

* added TestUpdateExtractorState

* cleanup

* continued work on TestUpdateState

* testing ShouldBecomeActive

* refactor

* cleaned up unit test

* clean

* cleanup

* Changed console to logging

* added documentation

* more documentation

* fixes

* test

* update gitignore

* rm track

* rm

* rm comment

* updated config file

* start of integration test

* added another integration test

* interval change

* documentation update

* added config for interval

* updates to test and refactor

* add warning

* revert file

* revert

* new line at end of file

* format

* created initial fix

* revert changed files

* rm console write

* updated periodic schedule

* limit update

* made interval null by default

* changed type and name of min interval

* fix multple extractors bug

* merging scheduler fix

* fixes based on pr

* new folder structure

* minor fix

* made HighAvailabilityManager abstract

* clean-up

* rename test

* rm folder

* moved cron wrapper to utils

* checking for active extractor first

* minor fixes

* update default threshold

Co-authored-by: Kjerand Evje <[email protected]>
  • Loading branch information
kjerand and Kjerand Evje authored Aug 5, 2022
1 parent 7a302b1 commit 0deab7e
Show file tree
Hide file tree
Showing 13 changed files with 1,177 additions and 6 deletions.
65 changes: 65 additions & 0 deletions Cognite.Common/HighAvailability.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
using System;
using System.Threading.Tasks;
using System.Collections.Generic;

namespace Cognite.Extractor.Common
{
/// <summary>
/// Interface for a manager used to add high availability.
/// </summary>
public interface IHighAvailabilityManager
{
/// <summary>
/// Method used to add high availability to an extractor.
/// </summary>
/// <returns></returns>
Task WaitToBecomeActive();
}

/// <summary>
/// Interface for the instance of an extractor.
/// </summary>
public interface IExtractorInstance
{
/// <summary>
/// The index of the extractor.
/// </summary>
int Index { get; set; }

/// <summary>
/// The time when the extractor was last updated.
/// </summary>
DateTime TimeStamp { get; set; }

/// <summary>
/// The active status of the extractor.
/// </summary>
bool Active { get; set; }
}

/// <summary>
/// Class used to store the state of the extractors.
/// </summary>
public class ExtractorState
{
/// <summary>
/// State of the current extractors.
/// </summary>
public List<IExtractorInstance> CurrentState { get; set; }

/// <summary>
/// Value used by the extractor to update its own active status.
/// </summary>
public bool UpdatedStatus { get; set; }

/// <summary>
/// Constructor.
/// </summary>
/// <param name="initialStatus">The initial active status of the extractor.</param>
public ExtractorState(bool initialStatus = false)
{
CurrentState = new List<IExtractorInstance>();
UpdatedStatus = initialStatus;
}
}
}
16 changes: 12 additions & 4 deletions ExampleExtractor/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,19 @@
using Cognite.Extractor.Utils.CommandLine;
using System.CommandLine;

class MyExtractor : BaseExtractor<BaseConfig>
class MyExtractor : BaseExtractor<MyConfig>
{
public MyExtractor(BaseConfig config, IServiceProvider provider, CogniteDestination destination, ExtractionRun run)
public MyExtractor(MyConfig config, IServiceProvider provider, CogniteDestination destination, ExtractionRun run)
: base(config, provider, destination, run)
{
if (run != null) run.Continuous = true;
}

protected override async Task Start()
{
// Adding high availability to the extractor.
await RunWithHighAvailabilityAndWait(Config.HighAvailability).ConfigureAwait(false);

var result = await Destination.EnsureTimeSeriesExistsAsync(new[]
{
new TimeSeriesCreate {
Expand All @@ -38,6 +41,11 @@ protected override async Task Start()
}
}

class MyConfig : BaseConfig
{
public HighAvailabilityConfig HighAvailability { get; set; }
}

// Class for flat command line arguments
class Options
{
Expand All @@ -60,8 +68,8 @@ static async Task Main(string[] args)
command.SetHandler<Options>(async opt =>
{
// This can also be invoked directly in main, to not have a CLI.
await ExtractorRunner.Run<BaseConfig, MyExtractor>(
configPath: opt.ConfigPath ?? "config.yml",
await ExtractorRunner.Run<MyConfig, MyExtractor>(
configPath: opt.ConfigPath ?? "./ExampleExtractor/config.yml",
acceptedConfigVersions: new[] { 1 },
appId: "my-extractor",
userAgent: "myextractor/1.0.0",
Expand Down
7 changes: 6 additions & 1 deletion ExampleExtractor/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,9 @@ cognite:
tenant: ${BF_TEST_TENANT}
secret: ${BF_TEST_SECRET}
scopes:
- ${BF_TEST_SCOPE}
- ${BF_TEST_SCOPE}
high-availability:
index: 1
raw:
database-name: ${BF_TEST_DB}
table-name: ${BF_TEST_TABLE}
213 changes: 213 additions & 0 deletions ExtractorUtils.Test/integration/RawHighAvailabilityTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
using Cognite.Extensions;
using Cognite.Extractor.Utils;
using CogniteSdk;
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using Xunit;
using Xunit.Abstractions;
using Microsoft.Extensions.DependencyInjection;

namespace ExtractorUtils.Test.Integration
{
class MyExtractor : BaseExtractor<MyTestConfig>
{
public MyExtractor(MyTestConfig config, IServiceProvider provider, CogniteDestination destination, ExtractionRun run)
: base(config, provider, destination, run)
{
if (run != null) run.Continuous = true;
}

protected override async Task Start()
{
await RunWithHighAvailabilityAndWait(
Config.HighAvailability,
interval: new TimeSpan(0, 0, 2),
inactivityThreshold: new TimeSpan(0, 0, 4)).ConfigureAwait(false);

var result = await Destination.EnsureTimeSeriesExistsAsync(new[]
{
new TimeSeriesCreate {
ExternalId = "sine-wave",
Name = "Sine Wave"
}
}, RetryMode.OnError, SanitationMode.Clean, Source.Token).ConfigureAwait(false);
result.ThrowOnFatal();
CreateTimeseriesQueue(1000, TimeSpan.FromSeconds(1), null);
ScheduleDatapointsRun("datapoints", TimeSpan.FromMilliseconds(100), token =>
{
var dp = (
Identity.Create("sine-wave"),
new Datapoint(DateTime.UtcNow, Math.Sin(DateTime.UtcNow.Ticks))
);
return Task.FromResult<IEnumerable<(Identity, Datapoint)>>(new[] { dp });
});
}
}

class MyTestConfig : BaseConfig
{
public HighAvailabilityConfig HighAvailability { get; set; }
}

public class RawHighAvailabilityTest
{
private readonly ITestOutputHelper _output;

private readonly string _dbName = "test-db-integration";

private readonly string _tableName = "test-table-integration";

public RawHighAvailabilityTest(ITestOutputHelper output)
{
_output = output;
}

[Fact(Timeout = 30000)]
public async void TestExtractorManagerRun()
{
// Creating configs for two different extractors.
string configPath_0 = SetupConfig(index: 0);
string configPath_1 = SetupConfig(index: 1);

using var source_0 = new CancellationTokenSource();
using var source_1 = new CancellationTokenSource();

// Creating two extractors.
Task extractor_0 = CreateExtractor(configPath_0, source_0.Token);
Task extractor_1 = CreateExtractor(configPath_1, source_1.Token);

// Turning off extractor 0 after 15s and 1 after 25s.
Task cancel_0 = TurnOffAfterDelay(15000, source_0);
Task cancel_1 = TurnOffAfterDelay(25000, source_1);

try
{
// Testing running two extractors at the same, then turning them off one by one.
// When the first extractor is turned off the second extractor will become active.
Task tasks = Task.WhenAll(extractor_0, extractor_1, cancel_0, cancel_1);
await tasks;
Assert.True(tasks.IsCompleted);
}
finally
{
await DeleteDatabase(configPath_0);

System.IO.File.Delete(configPath_0);
System.IO.File.Delete(configPath_1);
}
}

[Fact(Timeout = 45000)]
public async void TestRestartExtractor()
{
// Creating config for two extractors.
string configPath_0 = SetupConfig(index: 0);
string configPath_1 = SetupConfig(index: 1);

using var source_0 = new CancellationTokenSource();
using var source_1 = new CancellationTokenSource();
using var source_2 = new CancellationTokenSource();

// Creating extractor 0 and 1.
Task extractor_0 = CreateExtractor(configPath_0, source_0.Token);
Task extractor_1 = CreateExtractor(configPath_1, source_1.Token);
// Creating a copy of extractor 0 that will start after 20s.
Task restart_0 = CreateExtractor(configPath_0, source_2.Token, delay: 20000);

//Turning off extractor 0 after 10s, 1 after 30s and the restarted 0 after 40s.
Task cancel_0 = TurnOffAfterDelay(10000, source_0);
Task cancel_1 = TurnOffAfterDelay(30000, source_1);
Task cancel_2 = TurnOffAfterDelay(40000, source_2);

try
{
// Running two extractors.
// When extractor 0 is turned off, extractor 1 will start.
// Then when extractor 0 is restarted it will go into standby.
// Lastly when extractor 1 is turned off, 0 will start again.
Task tasks = Task.WhenAll(extractor_0, extractor_1, cancel_0, cancel_1, cancel_2, restart_0);
await tasks;

Assert.True(tasks.IsCompleted);
}
finally
{
await DeleteDatabase(configPath_0);

System.IO.File.Delete(configPath_0);
System.IO.File.Delete(configPath_1);
}
}

private string SetupConfig(int index)
{
var config = CDFTester.GetConfig(CogniteHost.BlueField);
string[] lines = {
"high-availability:",
$" index: {index}",
$" raw:",
$" database-name: {_dbName}",
$" table-name: {_tableName}"};

foreach (string line in lines) config = config.Append(line).ToArray();

string path = $"test-extractor-manager-{index}-config";
System.IO.File.WriteAllLines(path, config);

return path;
}

private async Task<Task> CreateExtractor(string configPath, CancellationToken token, int delay = 0)
{
if (delay > 0) await Task.Delay(delay);

Task extractor = Task.Run(async () =>
await ExtractorRunner.Run<MyTestConfig, MyExtractor>(
configPath,
null,
"test-extractor-manager",
null,
false,
true,
false,
false,
token).ConfigureAwait(false));

return extractor;
}

private Task TurnOffAfterDelay(int delay, CancellationTokenSource source)
{
Task cancel = Task.Run(async () =>
{
await Task.Delay(delay);
source.Cancel();
});

return cancel;
}

private async Task DeleteDatabase(string path)
{
var services = new ServiceCollection();
services.AddConfig<MyTestConfig>(path, 2);
services.AddCogniteClient("testApp");

using (var provider = services.BuildServiceProvider())
{
var destination = provider.GetRequiredService<CogniteDestination>();
await destination.CogniteClient.Raw.DeleteDatabasesAsync(new RawDatabaseDelete
{
Items = new[]
{
new RawDatabase { Name = _dbName}
},
Recursive = true
});
}
}
}
}
Loading

0 comments on commit 0deab7e

Please sign in to comment.