Skip to content

Commit

Permalink
perf!: switch to RestTransport as the default transport type for GetI…
Browse files Browse the repository at this point in the history
…ndex

- BREAKING CHANGE: generic argumentless GetIndex now returns Index<RestTransport>, the motivation for this is upon deeper evaluation and load-test of the GrpcTransport it was discovered that the underlying protobuf serializer is less memory-efficient than System.Text.Json and ends up often rooting large arrays in ArrayPool, especially under batched parallel upserts, leading to much larger LOH size. In addition to that, Pinecone infra team seems to configured their ingress with a very low amount of allowed concurrent streams per HTTP/2 connection. As a result, even with client-side load balancing, gRPC ends up having much worse throughput than using plain HTTP/1.1 and JSON. Shameful. Therefore, RestTransport currently appears to be a more user-friendly default, and advanced users can always choose a different one if they evaluate that gRPC serves their needs better. TODO: add performance guidance to README
- Cache boolean boxes for MetadataValue
- Make sure to test both transport types
- F# example formatting
  • Loading branch information
neon-sunset committed Sep 25, 2024
1 parent c11c916 commit e00bfe4
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 26 deletions.
10 changes: 5 additions & 5 deletions example/Example.FSharp/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ open System.Collections.Generic
open Pinecone

let createMetadata x =
MetadataMap(x |> Seq.map (fun (k, m) -> KeyValuePair(k,m) ))
MetadataMap(x |> Seq.map (fun (k, m) -> KeyValuePair(k, m) ))

let getRandomVector size =
Array.init size (fun _ -> Random.Shared.NextSingle())
Expand All @@ -27,17 +27,17 @@ let main = task {
use! index = pinecone.GetIndex(indexName)

let tags = [|"tag1" ; "tag2"|]
let first = Vector(Id = "first", Values = getRandomVector 1536, Metadata = createMetadata["new", true; "price", 50; "tags", tags])
let second = Vector(Id = "second", Values = getRandomVector 1536, Metadata = createMetadata["price", 50])
let first = Vector(Id = "first", Values = getRandomVector 1536, Metadata = createMetadata ["new", true; "price", 50; "tags", tags])
let second = Vector(Id = "second", Values = getRandomVector 1536, Metadata = createMetadata ["price", 50])

// Upsert vectors into the index
let! _ = index.Upsert [|first; second|]

// Partially update a vector (allows to update dense/sparse/metadata properties only)
do! index.Update("second", metadata = createMetadata["price", 99])
do! index.Update("second", metadata = createMetadata ["price", 99])

// Specify metadata filter to query the index with
let priceRange = createMetadata["price", createMetadata["$gte", 75; "$lte", 125]]
let priceRange = createMetadata ["price", createMetadata ["$gte", 75; "$lte", 125]]

// Query the index by embedding and metadata filter
let! results = index.Query(getRandomVector 1536, 3u, filter = priceRange, includeMetadata = true)
Expand Down
16 changes: 13 additions & 3 deletions src/PineconeClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -142,18 +142,28 @@ private async Task CreateIndex(CreateIndexRequest request, CancellationToken ct
}

/// <summary>
/// Creates an <see cref="Index{GrpcTransport}"/> object describing the index. It is a main entry point for interacting with vectors.
/// Creates an <see cref="Index{RestTransport}"/> object describing the index. It is the main entry point for interacting with vectors.
/// It is used to upsert, query, fetch, update, delete and list vectors, as well as retrieving index statistics.
/// </summary>
/// <remarks>
/// The <see cref="Index{T}"/> abstraction is thread-safe and can be shared across multiple threads.
/// It is strongly recommended to cache and reuse the <see cref="Index{T}"/> object, for example by registering it as a singleton in a DI container.
/// If not, make sure to dispose the <see cref="Index{T}"/> when it is no longer needed.
/// </remarks>
/// <param name="name">Name of the index to describe.</param>
/// <param name="ct">A <see cref="CancellationToken" /> to observe while waiting for the task to complete.</param>
/// <returns><see cref="Index{GrpcTransport}"/> describing the index.</returns>
public Task<Index<GrpcTransport>> GetIndex(string name, CancellationToken ct = default) => GetIndex<GrpcTransport>(name, ct);
/// <returns><see cref="Index{RestTransport}"/> describing the index.</returns>
public Task<Index<RestTransport>> GetIndex(string name, CancellationToken ct = default) => GetIndex<RestTransport>(name, ct);

/// <summary>
/// Creates an <see cref="Index{TTransport}"/> object describing the index. It is a main entry point for interacting with vectors.
/// It is used to upsert, query, fetch, update, delete and list vectors, as well as retrieving index statistics.
/// </summary>
/// <remarks>
/// The <see cref="Index{T}"/> abstraction is thread-safe and can be shared across multiple threads.
/// It is strongly recommended to cache and reuse the <see cref="Index{T}"/> object, for example by registering it as a singleton in a DI container.
/// If not, make sure to dispose the <see cref="Index{T}"/> when it is no longer needed.
/// </remarks>
/// <typeparam name="TTransport">The type of transport layer used, either <see cref="GrpcTransport"/> or <see cref="RestTransport"/>.</typeparam>
/// <param name="name">Name of the index to describe.</param>
/// <param name="ct">A <see cref="CancellationToken" /> to observe while waiting for the task to complete.</param>
Expand Down
5 changes: 4 additions & 1 deletion src/Types/VectorTypes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ public MetadataMap(IEnumerable<KeyValuePair<string, MetadataValue>> collection)
[JsonConverter(typeof(MetadataValueConverter))]
public readonly record struct MetadataValue
{
static readonly object True = true;
static readonly object False = false;

/// <summary>
/// Metadata value stored.
/// </summary>
Expand Down Expand Up @@ -158,7 +161,7 @@ public static bool TryCreate<T>(T? value, out MetadataValue metadataValue)
public override string ToString() => Inner?.ToString() ?? "null";

// Main supported types
public static implicit operator MetadataValue(bool value) => new(value);
public static implicit operator MetadataValue(bool value) => new(value ? True : False);
public static implicit operator MetadataValue(string? value) => new(value);
public static implicit operator MetadataValue(int value) => new((double)value);
public static implicit operator MetadataValue(long value) => new((double)value);
Expand Down
17 changes: 9 additions & 8 deletions test/DataTestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@

namespace PineconeTests;

public abstract class DataTestBase<TFixture>(TFixture fixture) : IClassFixture<TFixture>
where TFixture: DataTestFixtureBase
public abstract class DataTestBase<TFixture, TTransport>(TFixture fixture) : IClassFixture<TFixture>
where TFixture: DataTestFixtureBase<TTransport>
where TTransport: ITransport<TTransport>
{
protected TFixture Fixture { get; } = fixture;

Expand Down Expand Up @@ -211,11 +212,11 @@ public async Task Basic_vector_upsert_update_delete()
var attemptCount = 0;
do
{
await Task.Delay(DataTestFixtureBase.DelayInterval);
await Task.Delay(DataTestFixtureBase<TTransport>.DelayInterval);
attemptCount++;
var finalFetch = await Fixture.Index.Fetch(["update-vector-id-2"], testNamespace);
updatedVector = finalFetch["update-vector-id-2"];
} while (updatedVector.Values.Span[0] != 23 && attemptCount < DataTestFixtureBase.MaxAttemptCount);
} while (updatedVector.Values.Span[0] != 23 && attemptCount < DataTestFixtureBase<TTransport>.MaxAttemptCount);

Assert.Equal("update-vector-id-2", updatedVector.Id);
Assert.Equal(new[] { 23f, 3, 5, 7, 11, 13, 17, 19 }, updatedVector.Values);
Expand Down Expand Up @@ -249,11 +250,11 @@ public async Task Upsert_on_existing_vector_makes_an_update()
var attemptCount = 0;
do
{
await Task.Delay(DataTestFixtureBase.DelayInterval);
await Task.Delay(DataTestFixtureBase<TTransport>.DelayInterval);
attemptCount++;
var finalFetch = await Fixture.Index.Fetch(["update-vector-id-3"], testNamespace);
updatedVector = finalFetch["update-vector-id-3"];
} while (updatedVector.Values.Span[0] != 0 && attemptCount < DataTestFixtureBase.MaxAttemptCount);
} while (updatedVector.Values.Span[0] != 0 && attemptCount < DataTestFixtureBase<TTransport>.MaxAttemptCount);

Assert.Equal("update-vector-id-3", updatedVector.Id);
Assert.Equal(new[] { 0f, 1, 1, 2, 3, 5, 8, 13 }, updatedVector.Values);
Expand All @@ -278,11 +279,11 @@ public async Task Delete_all_vectors_from_namespace()
var attemptCount = 0;
do
{
await Task.Delay(DataTestFixtureBase.DelayInterval);
await Task.Delay(DataTestFixtureBase<TTransport>.DelayInterval);
attemptCount++;
stats = await Fixture.Index.DescribeStats();
} while (stats.Namespaces.Where(x => x.Name == testNamespace).Select(x => x.VectorCount).SingleOrDefault() > 0
&& attemptCount <= DataTestFixtureBase.MaxAttemptCount);
&& attemptCount <= DataTestFixtureBase<TTransport>.MaxAttemptCount);

Assert.Equal((uint)0, stats.Namespaces.Where(x => x.Name == testNamespace).Select(x => x.VectorCount).SingleOrDefault());
}
Expand Down
6 changes: 3 additions & 3 deletions test/DataTestFixtureBase.cs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
using Pinecone;
using Pinecone.Grpc;
using Xunit;

namespace PineconeTests;
public abstract class DataTestFixtureBase : IAsyncLifetime
public abstract class DataTestFixtureBase<T> : IAsyncLifetime
where T : ITransport<T>
{
public const int MaxAttemptCount = 100;
public const int DelayInterval = 300;
public abstract string IndexName { get; }

public PineconeClient Pinecone { get; private set; } = null!;

public virtual Index<GrpcTransport> Index { get; set; } = null!;
public virtual Index<T> Index { get; set; } = null!;

public virtual async Task InitializeAsync()
{
Expand Down
3 changes: 2 additions & 1 deletion test/IndexTests.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Pinecone;
using Pinecone.Grpc;
using Pinecone.Rest;
using PineconeTests.Xunit;
using Xunit;

Expand Down Expand Up @@ -53,7 +54,7 @@ public async Task Create_and_delete_index(Metric metric, bool serverless)
await pinecone.CreatePodBasedIndex(indexName, 3, metric, "gcp-starter");
}

Index<GrpcTransport> index;
Index<RestTransport> index;
var attemptCount = 0;
do
{
Expand Down
6 changes: 4 additions & 2 deletions test/PodBasedDataTests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Pinecone;
using Pinecone.Rest;
using PineconeTests.Xunit;
using Xunit;

Expand All @@ -7,9 +8,10 @@ namespace PineconeTests;
[Collection("PineconeTests")]
[PineconeApiKeySetCondition]
[SkipTestCondition("Test environment uses free tier which does not support pod-based indexes.")]
public class PodBasedDataTests(PodBasedDataTests.PodBasedDataTestFixture fixture) : DataTestBase<PodBasedDataTests.PodBasedDataTestFixture>(fixture)
public class PodBasedDataTests(PodBasedDataTests.PodBasedDataTestFixture fixture)
: DataTestBase<PodBasedDataTests.PodBasedDataTestFixture, RestTransport>(fixture)
{
public class PodBasedDataTestFixture : DataTestFixtureBase
public class PodBasedDataTestFixture : DataTestFixtureBase<RestTransport>
{
public override string IndexName => "pod-data-tests";

Expand Down
38 changes: 35 additions & 3 deletions test/ServerlessDataTests.cs
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
using Pinecone;
using Pinecone.Grpc;
using Pinecone.Rest;
using PineconeTests.Xunit;
using Xunit;

namespace PineconeTests;

[Collection("PineconeTests")]
[PineconeApiKeySetCondition]
public class ServerlessDataTests(ServerlessDataTests.ServerlessDataTestFixture fixture) : DataTestBase<ServerlessDataTests.ServerlessDataTestFixture>(fixture)
public class ServerlessRestTransportDataTests(ServerlessRestTransportDataTests.ServerlessDataTestFixture fixture)
: DataTestBase<ServerlessRestTransportDataTests.ServerlessDataTestFixture, RestTransport>(fixture)
{
public class ServerlessDataTestFixture : DataTestFixtureBase
public class ServerlessDataTestFixture : DataTestFixtureBase<RestTransport>
{
public override string IndexName => "serverless-data-tests";
public override string IndexName => "serverless-rest-data-tests";

protected override async Task CreateIndexAndWait()
{
Expand All @@ -31,3 +34,32 @@ protected override async Task CreateIndexAndWait()
}
}
}

[Collection("PineconeTests")]
[PineconeApiKeySetCondition]
public class ServerlessGrpcTransportDataTests(ServerlessGrpcTransportDataTests.ServerlessDataTestFixture fixture)
: DataTestBase<ServerlessGrpcTransportDataTests.ServerlessDataTestFixture, GrpcTransport>(fixture)
{
public class ServerlessDataTestFixture : DataTestFixtureBase<GrpcTransport>
{
public override string IndexName => "serverless-grpc-data-tests";

protected override async Task CreateIndexAndWait()
{
var attemptCount = 0;
await Pinecone.CreateServerlessIndex(IndexName, dimension: 8, metric: Metric.DotProduct, cloud: "aws", region: "us-east-1");

do
{
await Task.Delay(DelayInterval);
attemptCount++;
Index = await Pinecone.GetIndex<GrpcTransport>(IndexName);
} while (!Index.Status.IsReady && attemptCount <= MaxAttemptCount);

if (!Index.Status.IsReady)
{
throw new InvalidOperationException("'Create index' operation didn't complete in time. Index name: " + IndexName);
}
}
}
}

0 comments on commit e00bfe4

Please sign in to comment.