Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[APIPUB-69] - Implements Reverse Paging. #70

Merged
merged 8 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/EdFi.Tools.ApiPublisher.Cli/apiPublisherSettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
"changeVersionPagingWindowSize": 25000,
"enableRateLimit": false,
"rateLimitNumberExecutions": 100,
"rateLimitTimeLimitMinutes": 1
"rateLimitTimeLimitMinutes": 1,
"useReversePaging": false
},
"authorizationFailureHandling": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,18 @@ protected override void Load(ContainerBuilder builder)
// Register resource page message producer using a ChangeVersion paging strategy
if (options.UseChangeVersionPaging)
{
builder.RegisterType<EdFiApiChangeVersionPagingStreamResourcePageMessageProducer>()
if (options.useReversePaging)
{
builder.RegisterType<EdFiApiChangeVersionReversePagingStreamResourcePageMessageProducer>()
.As<IStreamResourcePageMessageProducer>()
.SingleInstance();
}
else
{
builder.RegisterType<EdFiApiChangeVersionPagingStreamResourcePageMessageProducer>()
.As<IStreamResourcePageMessageProducer>()
.SingleInstance();
}
}
// Register resource page message producer using a limit/offset paging strategy
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,20 +177,26 @@ public async Task<IEnumerable<TProcessDataMessage>> HandleStreamResourcePageAsyn
break;
}

// Perform limit/offset final page check (for need for possible continuation)
if (message.IsFinalPage && JArray.Parse(responseContent).Count == limit)
if (!options.useReversePaging)
{
if (_logger.IsEnabled(LogEventLevel.Debug))
// Perform limit/offset final page check (for need for possible continuation)
if (message.IsFinalPage && JArray.Parse(responseContent).Count == limit)
{
_logger.Debug($"{message.ResourceUrl}: Final page was full. Attempting to retrieve more data.");
}
if (_logger.IsEnabled(LogEventLevel.Debug))
{
_logger.Debug($"{message.ResourceUrl}: Final page was full. Attempting to retrieve more data.");
}

// Looks like there could be more data
offset += limit;
// Looks like there could be more data
offset += limit;

continue;
continue;
}
}
else
{
break;
}

}
catch (RateLimiterRejectedException ex)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public async Task<IEnumerable<StreamResourcePageMessage<TProcessDataMessage>>> P
$"{message.ResourceUrl}: Retrieving total count of items in change versions {message.ChangeWindow.MinChangeVersion} to {message.ChangeWindow.MaxChangeVersion}.");
}
else
{
{
_logger.Information($"{message.ResourceUrl}: Retrieving total count of items.");
}

Expand All @@ -59,7 +59,7 @@ public async Task<IEnumerable<StreamResourcePageMessage<TProcessDataMessage>>> P
int limit = message.PageSize;

var pageMessages = new List<StreamResourcePageMessage<TProcessDataMessage>>();

if (totalCount > 0)
{
var noOfPartitions = Math.Ceiling((decimal)(message.ChangeWindow.MaxChangeVersion - message.ChangeWindow.MinChangeVersion)
Expand All @@ -72,7 +72,7 @@ public async Task<IEnumerable<StreamResourcePageMessage<TProcessDataMessage>>> P
{
long changeVersionWindowEndValue = (changeVersionWindowStartValue > 0 ?
changeVersionWindowStartValue - 1 : changeVersionWindowStartValue) + options.ChangeVersionPagingWindowSize;

if (changeVersionWindowEndValue > message.ChangeWindow.MaxChangeVersion)
{
changeVersionWindowEndValue = message.ChangeWindow.MaxChangeVersion;
Expand Down Expand Up @@ -134,4 +134,4 @@ public async Task<IEnumerable<StreamResourcePageMessage<TProcessDataMessage>>> P

return pageMessages;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// SPDX-License-Identifier: Apache-2.0
// Licensed to the Ed-Fi Alliance under one or more agreements.
// The Ed-Fi Alliance licenses this file to you under the Apache License, Version 2.0.
// See the LICENSE and NOTICES files in the project root for more information.

using EdFi.Tools.ApiPublisher.Core.Configuration;
using EdFi.Tools.ApiPublisher.Core.Counting;
using EdFi.Tools.ApiPublisher.Core.Processing;
using EdFi.Tools.ApiPublisher.Core.Processing.Handlers;
using EdFi.Tools.ApiPublisher.Core.Processing.Messages;
using Serilog;
using System.Threading.Tasks.Dataflow;

namespace EdFi.Tools.ApiPublisher.Connections.Api.Processing.Source.MessageProducers;

public class EdFiApiChangeVersionReversePagingStreamResourcePageMessageProducer : IStreamResourcePageMessageProducer
{
private readonly ISourceTotalCountProvider _sourceTotalCountProvider;
private readonly ILogger _logger = Log.ForContext(typeof(EdFiApiLimitOffsetPagingStreamResourcePageMessageProducer));

public EdFiApiChangeVersionReversePagingStreamResourcePageMessageProducer(ISourceTotalCountProvider sourceTotalCountProvider)
{
_sourceTotalCountProvider = sourceTotalCountProvider;
}

public async Task<IEnumerable<StreamResourcePageMessage<TProcessDataMessage>>> ProduceMessagesAsync<TProcessDataMessage>(
StreamResourceMessage message,
Options options,
ITargetBlock<ErrorItemMessage> errorHandlingBlock,
Func<StreamResourcePageMessage<TProcessDataMessage>, string, IEnumerable<TProcessDataMessage>> createProcessDataMessages,
CancellationToken cancellationToken)
{
if (message.ChangeWindow?.MaxChangeVersion != default(long) && message.ChangeWindow?.MaxChangeVersion != null)
{
_logger.Information(
$"{message.ResourceUrl}: Retrieving total count of items in change versions {message.ChangeWindow.MinChangeVersion} to {message.ChangeWindow.MaxChangeVersion}.");
}
else
{
_logger.Information($"{message.ResourceUrl}: Retrieving total count of items.");
}

// Get total count of items in source resource for change window (if applicable)
var (totalCountSuccess, totalCount) = await _sourceTotalCountProvider.TryGetTotalCountAsync(
message.ResourceUrl,
options,
message.ChangeWindow,
errorHandlingBlock,
cancellationToken);

if (!totalCountSuccess)
{
// Allow processing to continue without performing additional work on this resource.
return Enumerable.Empty<StreamResourcePageMessage<TProcessDataMessage>>();
}

_logger.Information($"{message.ResourceUrl}: Total count = {totalCount}");

int limit = message.PageSize;

var pageMessages = new List<StreamResourcePageMessage<TProcessDataMessage>>();

if (totalCount > 0)
{
var noOfPartitions = Math.Ceiling((decimal)(message.ChangeWindow.MaxChangeVersion - message.ChangeWindow.MinChangeVersion)
/ options.ChangeVersionPagingWindowSize);

int changeVersionWindow = 0;
long changeVersionWindowStartValue = message.ChangeWindow.MinChangeVersion;

while (changeVersionWindow < noOfPartitions)
{
long changeVersionWindowEndValue = (changeVersionWindowStartValue > 0 ?
changeVersionWindowStartValue - 1 : changeVersionWindowStartValue) + options.ChangeVersionPagingWindowSize;

if (changeVersionWindowEndValue > message.ChangeWindow.MaxChangeVersion)
{
changeVersionWindowEndValue = message.ChangeWindow.MaxChangeVersion;
}
var changeWindow = new ChangeWindow
{
MinChangeVersion = changeVersionWindowStartValue,
MaxChangeVersion = changeVersionWindowEndValue
};
changeVersionWindowStartValue = changeVersionWindowEndValue + 1;

// Get total count of items in source resource for change window (if applicable)
var (totalCountOnWindowSuccess, totalCountOnWindow) = await _sourceTotalCountProvider.TryGetTotalCountAsync(
message.ResourceUrl,
options,
changeWindow,
errorHandlingBlock,
cancellationToken);

if (!totalCountOnWindowSuccess)
{
continue;
}

bool isLastOne = false;
long offsetOnWindow = totalCountOnWindow - limit;
if (offsetOnWindow < 0)
{
offsetOnWindow = 0;
isLastOne = true;
}

int limitOnWindow = totalCountOnWindow < limit ? (int)totalCountOnWindow : limit;
while ((offsetOnWindow >= 0 || isLastOne == true) && totalCountOnWindow > 0 && limitOnWindow > 0)
{
var pageMessage = new StreamResourcePageMessage<TProcessDataMessage>
{
// Resource-specific context
ResourceUrl = message.ResourceUrl,
PostAuthorizationFailureRetry = message.PostAuthorizationFailureRetry,

// Page-strategy specific context
Limit = limitOnWindow,
Offset = offsetOnWindow,

// Global processing context
ChangeWindow = changeWindow,
CreateProcessDataMessages = createProcessDataMessages,

CancellationSource = message.CancellationSource,
};

pageMessages.Add(pageMessage);
offsetOnWindow -= limit;
if (isLastOne)
break;
if (offsetOnWindow < 0)
{
limitOnWindow = limit + (int)offsetOnWindow;
offsetOnWindow = 0;
isLastOne = true;
}
}
changeVersionWindow++;

}
}

// Flag the last page for special "continuation" processing
if (pageMessages.Any())
{
// Page-strategy specific context
pageMessages.Last().IsFinalPage = true;
}

return pageMessages;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,5 +94,7 @@ public int MaxDegreeOfParallelismForPostResourceItem
public int RateLimitNumberExecutions { get; set; } = 100;

public int RateLimitTimeLimitMinutes { get; set; } = 1;

public bool useReversePaging { get; set; } = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming convention. Use Pascal Case

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public IConfigurationBuilder Create(string[] commandLineArgs)
["--enableRateLimit"] = "Options:EnableRateLimit",
["--rateLimitNumberExecutions"] = "Options:RateLimitNumberExecutions",
["--rateLimitTimeLimitMinutes"] = "Options:RateLimitTimeLimitMinutes",
["--useReversePaging"] = "Options:UseReversePaging",

// Resource selection (comma delimited paths - e.g. "/ed-fi/students,/ed-fi/studentSchoolAssociations")
["--include"] = "Connections:Source:Include",
Expand Down
11 changes: 8 additions & 3 deletions src/EdFi.Tools.ApiPublisher.Tests/Helpers/TestHelpers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,8 @@ public static ChangeProcessor CreateChangeProcessorWithDefaultDependencies(
IFakeHttpRequestHandler fakeSourceRequestHandler,
ApiConnectionDetails targetApiConnectionDetails,
IFakeHttpRequestHandler fakeTargetRequestHandler,
INodeJSService nodeJsService = null)
INodeJSService nodeJsService = null,
bool withReversePaging = false)
{
EdFiApiClient SourceApiClientFactory() =>
new EdFiApiClient(
Expand Down Expand Up @@ -300,8 +301,12 @@ EdFiApiClient TargetApiClientFactory() =>

var streamingResourceProcessor = new StreamingResourceProcessor(
new StreamResourceBlockFactory(
new EdFiApiLimitOffsetPagingStreamResourcePageMessageProducer(
new EdFiApiSourceTotalCountProvider(sourceEdFiApiClientProvider))),
(withReversePaging) ?
new EdFiApiChangeVersionReversePagingStreamResourcePageMessageProducer(
new EdFiApiSourceTotalCountProvider(sourceEdFiApiClientProvider)) :
new EdFiApiLimitOffsetPagingStreamResourcePageMessageProducer(
new EdFiApiSourceTotalCountProvider(sourceEdFiApiClientProvider))
),
new StreamResourcePagesBlockFactory(new EdFiApiStreamResourcePageMessageHandler(sourceEdFiApiClientProvider)),
sourceApiConnectionDetails);

Expand Down
Loading
Loading