Skip to content

Commit

Permalink
Fix InsertAsyncCreateMissing (#187)
Browse files Browse the repository at this point in the history
* Fix InsertAsyncCreateMissing

Also remove the VerifyDatapointsDeletion method.

The issue was that we want to check whether creating timeseries failed,
but we looked at the wrong result. Simple fix.

* Merge results
  • Loading branch information
einarmo authored Aug 5, 2022
1 parent 0deab7e commit 8d35465
Showing 1 changed file with 3 additions and 60 deletions.
63 changes: 3 additions & 60 deletions Cognite.Extensions/TimeSeries/DataPointExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -171,15 +171,15 @@ public static DataPointInsertionRequest ToInsertRequest(this IDictionary<Identit
sanitationMode,
token).ConfigureAwait(false);

if (result.Errors.Any(err => err.Type != ErrorType.ItemExists)) return (result, tsResult);
if (tsResult.Errors.Any(err => err.Type != ErrorType.ItemExists)) return (result, tsResult);

var pointsToInsert = points.Where(kvp => missingIds.Contains(kvp.Key)).ToDictionary(kvp => kvp.Key, kvp => kvp.Value);

result = await InsertAsync(client, pointsToInsert, keyChunkSize, valueChunkSize, throttleSize,
var result2 = await InsertAsync(client, pointsToInsert, keyChunkSize, valueChunkSize, throttleSize,
timeseriesChunkSize, timeseriesThrottleSize, gzipCountLimit, sanitationMode,
RetryMode.OnError, nanReplacement, token).ConfigureAwait(false);

return (result, tsResult);
return (result.Merge(result2), tsResult);
}


Expand Down Expand Up @@ -431,63 +431,6 @@ private static async Task<HashSet<Identity>> DeleteDataPointsIgnoreErrorsChunk(
return missing;
}

private static async Task<HashSet<Identity>> VerifyDataPointsDeletion(
DataPointsResource dataPoints,
IEnumerable<DataPointsQueryItem> query,
CancellationToken token)
{
int count = 1;
var dataPointsQuery = new DataPointsQuery()
{
Items = query
};

int tries = 0;
var notVerified = new HashSet<Identity>();
while (count > 0 && tries < _maxNumOfVerifyRequests)
{
notVerified.Clear();
DataPointListResponse results;
using (CdfMetrics.Datapoints.WithLabels("list").NewTimer())
{
results = await dataPoints.ListAsync(dataPointsQuery, token).ConfigureAwait(false);
}
count = 0;
var queries = dataPointsQuery.Items.ToList();
var remaining = new List<DataPointsQueryItem>();
for (int i = 0; i < results.Items.Count; ++i)
{
// The query and response have items in the same order
var q = queries[i];
var itemCount = results.Items[i].NumericDatapoints?.Datapoints.Count ?? 0 + results.Items[i].StringDatapoints?.Datapoints.Count ?? 0;
if (itemCount > 0)
{
var id = q.Id.HasValue ? new Identity(q.Id.Value) : new Identity(q.ExternalId);
notVerified.Add(id);
remaining.Add(q);
}
count += itemCount;
}
if (count > 0)
{
dataPointsQuery.Items = remaining;
tries++;
_logger.LogDebug("Could not verify the deletion of data points in {Count}/{Total} time series. Retrying in 500ms", count, query.Count());
await Task.Delay(500, token).ConfigureAwait(false);
}
}
if (tries == _maxNumOfVerifyRequests && count > 0)
{
_logger.LogWarning("Failed to verify the deletion of data points after {NumAttempts} attempts. Ids: {Ids}",
_maxNumOfVerifyRequests,
dataPointsQuery.Items.Select(q => q.Id.HasValue ? q.Id.ToString() : q.ExternalId.ToString()));
}
else
{
notVerified.Clear();
}
return notVerified;
}
/// <summary>
/// Get the last timestamp for each time series given in <paramref name="ids"/> before each given timestamp.
/// Ignores timeseries not in CDF. The return dictionary contains only ids that exist in CDF.
Expand Down

0 comments on commit 8d35465

Please sign in to comment.