Skip to content

Commit

Permalink
Feature/deep insert async (#2716)
Browse files Browse the repository at this point in the history
  • Loading branch information
KenitoInc authored Sep 5, 2023
1 parent 33d2b2b commit c4998cf
Show file tree
Hide file tree
Showing 9 changed files with 414 additions and 203 deletions.
2 changes: 1 addition & 1 deletion src/Microsoft.OData.Client/BaseSaveResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ protected override void AsyncEndGetResponse(IAsyncResult asyncResult)
{
this.HandleOperationResponse(responseMessage);

if (!Util.IsBulkUpdate(this.Options))
if (!Util.IsBulkUpdate(this.Options) && !Util.IsDeepInsert(this.Options))
{
this.HandleOperationResponseHeaders((HttpStatusCode)responseMessage.StatusCode, new HeaderCollection(responseMessage));
}
Expand Down
52 changes: 50 additions & 2 deletions src/Microsoft.OData.Client/DataServiceContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2339,7 +2339,7 @@ internal virtual DataServiceResponse EndBulkUpdate(IAsyncResult asyncResult)

#endregion

#region DeepInsert
#region DeepInsert, DeepInsertAsync, BeginDeepInsert, EndDeepInsert

/// <summary>
/// Processes deep insert requests. Creates a resource and its related resources or creates a resource and links it to its related existing resources in a single request.
Expand All @@ -2353,12 +2353,60 @@ public virtual DataServiceResponse DeepInsert<T>(T resource)
throw Error.ArgumentNull(nameof(resource));
}

DeepInsertSaveResult result = new DeepInsertSaveResult(this, Util.SaveChangesMethodName, SaveChangesOptions.DeepInsert, callback: null, state: null);
DeepInsertSaveResult result = new DeepInsertSaveResult(this, Util.DeepInsertMethodName, SaveChangesOptions.DeepInsert, callback: null, state: null);
result.DeepInsertRequest(resource);

return result.EndRequest();
}

/// <summary>
/// Asynchronously processes a deep insert request.
/// </summary>
/// <typeparam name="T">The type of top-level object to be deep inserted.</typeparam>
/// <param name="resource">The top-level object of the type to be deep inserted.</param>
/// <returns>A task representing the <see cref="DataServiceResponse"/> that holds the result of the deep insert operation.</returns>
public virtual Task<DataServiceResponse> DeepInsertAsync<T>(T resource)
{
return this.DeepInsertAsync(resource, CancellationToken.None);
}

/// <summary>
/// Asynchronously processes a deep insert request.
/// </summary>
/// <typeparam name="T">The type of top-level object to be deep inserted.</typeparam>
/// <param name="resource">The top-level object of the type to be deep inserted.</param>
/// <returns>A task representing the <see cref="DataServiceResponse"/> that holds the result of the deep insert operation.</returns>
public virtual Task<DataServiceResponse> DeepInsertAsync<T>(T resource, CancellationToken cancellationToken)
{
return FromAsync((objectsArg, callback, state) => BeginDeepInsert(callback, state, objectsArg), EndDeepInsert, resource, cancellationToken);
}

/// <summary>Asynchronously submits top-level objects to be deep inserted to the data service.</summary>
/// <param name="callback">The delegate that is called when a response to the deep insert request is received.</param>
/// <param name="state">User-defined state object that is used to pass context data to the callback method.</param>
/// <returns>An<see cref="IAsyncResult" /> object that is used to track the status of the asynchronous operation.</returns>
public virtual IAsyncResult BeginDeepInsert<T>(AsyncCallback callback, object state, T resource)
{
if (resource == null)
{
throw Error.ArgumentNull(nameof(resource));
}

DeepInsertSaveResult result = new DeepInsertSaveResult(this, Util.DeepInsertMethodName, SaveChangesOptions.DeepInsert, callback, state);
result.BeginDeepInsertRequest(resource);

return result;
}

/// <summary>Called to complete the <see cref="BeginDeepInsert{T}(AsyncCallback, object, T)"/>.</summary>
/// <param name="asyncResult">An <see cref="IAsyncResult" /> that represents the status of the asynchronous operation.</param>
/// <returns>The DataServiceResponse object that holds the result of the deep insert operation.</returns>
public virtual DataServiceResponse EndDeepInsert(IAsyncResult asyncResult)
{
DeepInsertSaveResult result = BaseAsyncResult.EndExecute<DeepInsertSaveResult>(this, Util.DeepInsertMethodName, asyncResult);
return result.EndRequest();
}

#endregion

#region Add, Attach, Delete, Detach, Update, TryGetEntity, TryGetUri
Expand Down
67 changes: 67 additions & 0 deletions src/Microsoft.OData.Client/DeepInsertSaveResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Globalization;
using System.IO;
using System.Linq;
Expand All @@ -20,6 +21,7 @@ namespace Microsoft.OData.Client
/// <summary>
/// Handles the deep insert requests and responses (both sync and async).
/// </summary>
[SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable", Justification = "The response stream is disposed by the message reader we create over it which we dispose inside the enumerator.")]
internal class DeepInsertSaveResult : BaseSaveResult
{
#region Private Fields
Expand Down Expand Up @@ -118,6 +120,52 @@ internal void DeepInsertRequest<T>(T resource)
}
}

/// <summary>
/// Begins an asynchronous deep insert request.
/// </summary>
/// <typeparam name="T">The type of the top-level object to be deep inserted.</typeparam>
/// <param name="resource">The top-level object of the type to be deep inserted.</param>
internal void BeginDeepInsertRequest<T>(T resource)
{
if (resource == null)
{
throw Error.ArgumentNull(nameof(resource));
}

PerRequest peReq = null;

try
{
BuildDescriptorGraph(this.ChangedEntries, true, resource);
ODataRequestMessageWrapper deepInsertRequestMessage = this.GenerateDeepInsertRequest();
this.Abortable = deepInsertRequestMessage;

deepInsertRequestMessage.SetContentLengthHeader();
this.perRequest = peReq = new PerRequest();
peReq.Request = deepInsertRequestMessage;
peReq.RequestContentStream = deepInsertRequestMessage.CachedRequestStream;

AsyncStateBag asyncStateBag = new AsyncStateBag(peReq);

this.responseStream = new MemoryStream();

IAsyncResult asyncResult = BaseAsyncResult.InvokeAsync(deepInsertRequestMessage.BeginGetRequestStream, this.AsyncEndGetRequestStream, asyncStateBag);

peReq.SetRequestCompletedSynchronously(asyncResult.CompletedSynchronously);
}
catch (Exception e)
{
this.HandleFailure(peReq, e);
throw;
}
finally
{
this.HandleCompleted(peReq);
}

Debug.Assert((this.CompletedSynchronously && this.IsCompleted) || !this.CompletedSynchronously, "sync without complete");
}

/// <summary>
/// This method processes all the changed descriptors in the entity tracker.
/// It loops through all the descriptors and creates relationships between the descriptors if any.
Expand Down Expand Up @@ -268,6 +316,25 @@ protected override ODataRequestMessageWrapper CreateRequestMessage(string method
return this.CreateTopLevelRequest(method, requestUri, headers, httpStack, descriptor);
}

/// <summary>Read and store response data for the current change</summary>
/// <param name="peReq">The completed <see cref="BaseAsyncResult.PerRequest"/> object</param>
/// <remarks>This is called only from the async code paths, when the response to the deep insert request has been read fully.</remarks>
protected override void FinishCurrentChange(PerRequest peReq)
{
base.FinishCurrentChange(peReq);

// This resets the position in the buffered response stream to the beginning
// so that we can start reading the response.
// In this case the ResponseStream is always a MemoryStream since we cache the async response.
if (this.responseStream.Position != 0)
{
this.responseStream.Position = 0;
}

this.perRequest = null;
this.SetCompleted();
}

/// <summary>
/// Gets the materializer state to process the response.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
virtual Microsoft.OData.Client.DataServiceContext.DeepInsert<T>(T resource) -> Microsoft.OData.Client.DataServiceResponse
virtual Microsoft.OData.Client.DataServiceContext.BeginDeepInsert<T>(System.AsyncCallback callback, object state, T resource) -> System.IAsyncResult
virtual Microsoft.OData.Client.DataServiceContext.DeepInsert<T>(T resource) -> Microsoft.OData.Client.DataServiceResponse
virtual Microsoft.OData.Client.DataServiceContext.DeepInsertAsync<T>(T resource) -> System.Threading.Tasks.Task<Microsoft.OData.Client.DataServiceResponse>
virtual Microsoft.OData.Client.DataServiceContext.DeepInsertAsync<T>(T resource, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.Task<Microsoft.OData.Client.DataServiceResponse>
virtual Microsoft.OData.Client.DataServiceContext.EndDeepInsert(System.IAsyncResult asyncResult) -> Microsoft.OData.Client.DataServiceResponse
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
virtual Microsoft.OData.Client.DataServiceContext.DeepInsert<T>(T resource) -> Microsoft.OData.Client.DataServiceResponse
virtual Microsoft.OData.Client.DataServiceContext.BeginDeepInsert<T>(System.AsyncCallback callback, object state, T resource) -> System.IAsyncResult
virtual Microsoft.OData.Client.DataServiceContext.DeepInsert<T>(T resource) -> Microsoft.OData.Client.DataServiceResponse
virtual Microsoft.OData.Client.DataServiceContext.DeepInsertAsync<T>(T resource) -> System.Threading.Tasks.Task<Microsoft.OData.Client.DataServiceResponse>
virtual Microsoft.OData.Client.DataServiceContext.DeepInsertAsync<T>(T resource, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.Task<Microsoft.OData.Client.DataServiceResponse>
virtual Microsoft.OData.Client.DataServiceContext.EndDeepInsert(System.IAsyncResult asyncResult) -> Microsoft.OData.Client.DataServiceResponse
3 changes: 3 additions & 0 deletions src/Microsoft.OData.Client/Util.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ internal static class Util
/// <summary>Method name for the BulkUpdate method.</summary>
internal const string BulkUpdateMethodName = "BulkUpdate";

/// <summary>Method name for the DeepInsert method.</summary>
internal const string DeepInsertMethodName = "DeepInsert";

/// <summary>
/// The number of components of version.
/// </summary>
Expand Down
Loading

0 comments on commit c4998cf

Please sign in to comment.