Skip to content

Commit

Permalink
docs: add parallel failure recovery example, nudge users to cache cli…
Browse files Browse the repository at this point in the history
…ent and index, and write detailed guidance on transport selection
  • Loading branch information
neon-sunset committed Sep 27, 2024
1 parent 7aa2b07 commit e104210
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 9 deletions.
102 changes: 97 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ var vectors = new[]
new Vector
{
Id = "vector1",
Values = new float[] { 0.1f, 0.2f, 0.3f },
Values = new float[] { 0.1f, 0.2f, 0.3f, ... },
Metadata = new MetadataMap
{
["genre"] = "horror",
Expand Down Expand Up @@ -94,10 +94,10 @@ await index.Delete(new[] { "vector1" });
// Delete vectors by metadata filter
await index.Delete(new MetadataMap
{
["genre"] = new MetadataMap
{
["$in"] = new[] { "documentary", "action" }
}
["genre"] = new MetadataMap
{
["$in"] = new[] { "documentary", "action" }
}
});

// Delete all vectors in the index
Expand All @@ -123,6 +123,98 @@ var details = await pinecone.DescribeCollection("myCollection");
await pinecone.DeleteCollection("myCollection");
```

## Advanced
Recovering from failures on batched parallel upsert
```csharp
// Upsert with recovery from up to three failures on batched parallel upsert.
//
// The parallelization is done automatically by the client based on the vector
// dimension and the number of vectors to upsert. It aims to keep the individual
// request size below Pinecone's 2MiB limit with some safety margin for metadata.
// This behavior can be further controlled by calling the 'Upsert' overload with
// custom values for 'batchSize' and 'parallelism' parameters.
//
// This is not the most efficient implementation in terms of allocations in
// GC pause frequency sensitive scenarios, but is perfectly acceptable
// for pretty much all regular back-end applications.
// Assuming there is an instance of 'index' available
// Generate 25k random vectors
var vectors = Enumerable
.Range(0, 25_000)
.Select(_ => new Vector
{
Id = Guid.NewGuid().ToString(),
Values = Enumerable
.Range(0, 1536)
.Select(_ => Random.Shared.NextSingle())
.ToArray()
})
.ToArray();

// Specify the retry limit we are okay with
var retries = 3;
do
{
try
{
// Perform the upsert
await index.Upsert(vectors);
// If no exception is thrown, break out of the retry loop
break;
}
catch (ParallelUpsertException e) when (retries-- > 0)
{
// Create a hash set to efficiently filter out the failed vectors
var filter = e.FailedBatchVectorIds.ToHashSet();
// Filter out the failed vectors from the batch and assign them to
// the 'vectors' variable consumed by 'Upsert' operation above
vectors = vectors.Where(v => filter.Contains(v.Id)).ToArray();
Console.WriteLine($"Retrying upsert due to error: {e.Message}");
}
} while (retries > 0);
```

A similar approach can be used to recover from other streamed or batched operations.
See `ListOperationException`, `ParallelFetchException` and `ParallelDeleteException` in [VectorTypes.cs](src/Types/VectorTypes.cs#L192-L282).

### REST vs gRPC transport

Prefer `RestTransport` by default. Please find the detailed explanation for specific scenarios below.

#### Low-throughput bandwith minimization

`GrpcTransport` is a viable alternative for reducing network traffic when working with large vectors under low to moderate throughput scenarios.
Protobuf encodes vectors in a much more compact manner, so if you have high-dimensional vectors (1536-3072+), low degree of request concurrency and usually upsert or fetch vectors in small bacthes, it is worth considering `GrpcTransport` for your use case.

Theoretically, low concurrency throughput may be higher with `GrpcTransport` due to the reduced network traffic, but because how trivial it is to simply
dispatch multiple requests in parallel (and the fact that `Fetch`, `Upsert` and `Delete` do so automatically), the advantages of this approach are likely to be limited.

#### High concurrency querying, high-throughput vector fetching and upserting

At the time of writing, Pinecone's HTTP/2 stack is configured to allow few or even just 1 concurrent stream per single HTTP/2 connection.
Because HTTP/2 is mandatory for gRPC, this causes significant request queuing over a gRPC channel under high concurrency scenarios, resulting in a poor scalability of the `GrpcTransport` with low ceiling for throughput.

The users that are not aware of this limitation may experience unexpected latency increase in their query operations under growing user count per application node and/or much lower than expected upsert and fetch throughput even when manually specifying greater degree of parallelism of `Upsert` and `Fetch` operations.

`Pinecone.NET` partially mitigates this issue by configuring gRPC channel to take advantage of client-side load balancing (DNS records based) to make use of multiple subchannels (there are currently 3 as returned by DNS query), which is expected to provide better throughput than other clients still. It also enables the use of multiple HTTP/2 connections per endpoint for the underlying `SocketsHttpHandler` but current gRPC implementation does not seem to properly take advantage of this.

The above is an observation of the client behavior under load-testing and additional infrastructure factors might be at play as indicated by user reports at Pinecone's community forum w.r.t. scalability in other implementations.

Expert users that still wish to use gRPC transport in high-load scenarios may want to explore further action items that are out of scope of this simple community-supported library:

- Implementing a custom subchannel balancer that forces per-connection scaling to match maximum request concurrency
- Investigating _potential_ Pinecone-side client throttling mechanism that prevents efficient pooling of multiple gRPC channels
- Implementing a simplified but completely custom gRPC transport that uses its own connection, channel and subchannel management, completely bypassing existing gRPC implementation

Regular users are advised to use `RestTransport` for high-throughput and/or high concurrency scenarios instead, unless their evaluation of `GrpcTransport` in their specific environment produces better results.

### Reducing application memory usage and GC pressure

As of right now, `RestTransport` and `System.Text.Json` it uses for serialization appear to be more memory-efficient than `GrpcTransport` and `Protobuf`. This is not an inherent limitation of gRPC, but rather a result of the current implementation of `grpc-dotnet`. `System.Text.Json` is heavily optimized with regards to allocation traffic and results in significantly lower sustained memory usage under both light and heavy load. Given the current state of `grpc-dotnet` implementation, I do not anticipate this to change in the near future. It is "good enough" for most applications, but the sustained heap size difference under load is significant enough to warrant stating this explicitly.

Please note that `Pinecone.NET` already performs zero-copy construction/reading of `RepeatedField<float>` that store vector values to alleviate the allocation pressure, but it is not enough to offset the advantage of using plain `System.Text.Json` serialization.

## Contributing

Contributions are welcome! Feel free to open an issue or a PR.
15 changes: 13 additions & 2 deletions src/Index.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public sealed partial record Index<
/// <summary>
/// An object used for interacting with vectors. It is used to upsert, query, fetch, update, delete and list vectors, as well as retrieving index statistics.
/// </summary>
/// <remarks>
/// The <see cref="Index{T}"/> abstraction is thread-safe and can be shared across multiple threads.
/// It is strongly recommended to cache and reuse the <see cref="Index{T}"/> object, for example by registering it as a singleton in a DI container.
/// If not, make sure to dispose the <see cref="Index{T}"/> when it is no longer needed.
/// </remarks>
/// <typeparam name="TTransport">The type of transport layer used.</typeparam>
public sealed partial record Index<TTransport> : IDisposable
where TTransport : ITransport<TTransport>
Expand Down Expand Up @@ -87,7 +92,11 @@ public Task<IndexStats> DescribeStats(MetadataMap? filter = null, CancellationTo
/// <summary>
/// Searches an index using the values of a vector with specified ID. It retrieves the IDs of the most similar items, along with their similarity scores.
/// </summary>
/// <remarks>Query by ID uses Approximate Nearest Neighbor, which doesn't guarantee the input vector to appear in the results. To ensure that, use the Fetch operation instead.</remarks>
/// <remarks>
/// Query by ID uses Approximate Nearest Neighbor, which doesn't guarantee the input vector to appear in the results. To ensure that, use the Fetch operation instead.
/// <para/>
/// If you do not need to include vector values in the response, set <paramref name="includeValues"/> to <c>false</c> to reduce the response size and read units consumption for the operation.
/// </remarks>
/// <param name="id">The unique ID of the vector to be used as a query vector.</param>
/// <param name="topK">The number of results to return for each query.</param>
/// <param name="filter">The filter to apply.</param>
Expand Down Expand Up @@ -119,14 +128,16 @@ public Task<ScoredVector[]> Query(
/// <summary>
/// Searches an index using the specified vector values. It retrieves the IDs of the most similar items, along with their similarity scores.
/// </summary>
/// <remarks>
/// If you do not need to include vector values in the response, set <paramref name="includeValues"/> to <c>false</c> to reduce the response size and read units consumption for the operation.
/// </remarks>
/// <param name="values">The query vector. This should be the same length as the dimension of the index being queried.</param>
/// <param name="sparseValues">Vector sparse data. Represented as a list of indices and a list of corresponded values, which must be with the same length.</param>
/// <param name="topK">The number of results to return for each query.</param>
/// <param name="filter">The filter to apply.</param>
/// <param name="indexNamespace">Namespace to query from. If no namespace is provided, the operation applies to all namespaces.</param>
/// <param name="includeValues">Indicates whether vector values are included in the response.</param>
/// <param name="includeMetadata">Indicates whether metadata is included in the response as well as the IDs.</param>
/// <returns></returns>
public Task<ScoredVector[]> Query(
ReadOnlyMemory<float> values,
uint topK,
Expand Down
10 changes: 8 additions & 2 deletions src/PineconeClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ namespace Pinecone;
/// <summary>
/// Main entry point for interacting with Pinecone. It is used to create, delete and modify indexes.
/// </summary>
/// <remarks>
/// The <see cref="PineconeClient" /> abstraction is thread-safe and can be shared across multiple threads.
/// Consider caching and reusing the <see cref="PineconeClient" /> object, for example by registering it as a singleton in a DI container.
/// If not, make sure to dispose the <see cref="PineconeClient" /> when it is no longer needed.
/// </remarks>
public sealed class PineconeClient : IDisposable
{
readonly HttpClient Http;
Expand Down Expand Up @@ -46,13 +51,14 @@ public PineconeClient(string apiKey, Uri baseUrl, ILoggerFactory? loggerFactory
/// <summary>
/// Creates a new instance of the <see cref="PineconeClient" /> class.
/// </summary>
/// /// <param name="apiKey">API key used to connect to Pinecone.</param>
/// /// <param name="client">HTTP client used to connect to Pinecone.</param>
/// <param name="apiKey">API key used to connect to Pinecone.</param>
/// <param name="client">HTTP client used to connect to Pinecone.</param>
/// <param name="loggerFactory">The logger factory to be used.</param>
public PineconeClient(string apiKey, HttpClient client, ILoggerFactory? loggerFactory = null)
{
ThrowHelpers.CheckNullOrWhiteSpace(apiKey);
ThrowHelpers.CheckNull(client);
ThrowHelpers.CheckNull(client.BaseAddress);

Http = client;
Http.AddPineconeHeaders(apiKey);
Expand Down

0 comments on commit e104210

Please sign in to comment.