Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Generalize hard-coded UTF-8 encoding #9

Merged
merged 3 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 28 additions & 36 deletions etcd_client.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ from typing import Any, AsyncIterator, Final, Optional

@dataclass
class EtcdLockOption:
lock_name: str
lock_name: bytes
timeout: Optional[float]
ttl: Optional[int]

Expand All @@ -30,31 +30,31 @@ class CompareOp:

class Compare:
@staticmethod
def version(key: str, cmp: "CompareOp", version: int) -> "Compare": ...
def version(key: bytes, cmp: "CompareOp", version: int) -> "Compare": ...
"""
Compares the version of the given key.
"""
@staticmethod
def create_revision(key: str, cmp: "CompareOp", revision: int) -> "Compare": ...
def create_revision(key: bytes, cmp: "CompareOp", revision: int) -> "Compare": ...
"""
Compares the creation revision of the given key.
"""
@staticmethod
def mod_revision(key: str, cmp: "CompareOp", revision: int) -> "Compare": ...
def mod_revision(key: bytes, cmp: "CompareOp", revision: int) -> "Compare": ...
"""
Compares the last modified revision of the given key.
"""
@staticmethod
def value(key: str, cmp: "CompareOp", value: str) -> "Compare": ...
def value(key: bytes, cmp: "CompareOp", value: bytes) -> "Compare": ...
"""
Compares the value of the given key.
"""
@staticmethod
def lease(key: str, cmp: "CompareOp", lease: int) -> "Compare": ...
def lease(key: bytes, cmp: "CompareOp", lease: int) -> "Compare": ...
"""
Compares the lease id of the given key.
"""
def with_range(self, end: list[int]) -> "Compare": ...
def with_range(self, end: bytes) -> "Compare": ...
"""
Sets the comparison to scan the range [key, end).
"""
Expand Down Expand Up @@ -95,11 +95,11 @@ class TxnOp:
"""

@staticmethod
def get(key: str) -> "TxnOp": ...
def get(key: bytes) -> "TxnOp": ...
@staticmethod
def put(key: str, value: str) -> "TxnOp": ...
def put(key: bytes, value: bytes) -> "TxnOp": ...
@staticmethod
def delete(key: str) -> "TxnOp": ...
def delete(key: bytes) -> "TxnOp": ...
@staticmethod
def txn(txn: "Txn") -> "TxnOp": ...

Expand Down Expand Up @@ -154,21 +154,15 @@ class CondVar:
""" """

class Communicator:
async def get(self, key: str) -> str:
async def get(self, key: bytes) -> list[int]:
"""
Gets the key from the key-value store.
"""
async def get_prefix(self, key: str) -> dict[str, Any]:
async def get_prefix(self, key: bytes) -> list[tuple[list[int], list[int]]]:
"""
Gets the key from the key-value store.
"""
async def put(self, key: str, value: str) -> None:
"""
Put the given key into the key-value store.
A put request increments the revision of the key-value store
and generates one event in the event history.
"""
async def put_prefix(self, key: str, value: dict[str, Any]) -> None:
async def put(self, key: bytes, value: bytes) -> None:
"""
Put the given key into the key-value store.
A put request increments the revision of the key-value store
Expand All @@ -181,19 +175,17 @@ class Communicator:
and generates events with the same revision for every completed operation.
It is not allowed to modify the same key several times within one txn.
"""
async def delete(self, key: str) -> None:
async def delete(self, key: bytes) -> None:
"""
Deletes the given key from the key-value store.
"""
async def delete_prefix(self, key: str) -> None:
async def delete_prefix(self, key: bytes) -> None:
"""
Deletes the given key from the key-value store.
"""
async def keys_prefix(self, key: str) -> list[str]:
""" """
async def replace(self, key: str, initial_value: str, new_value: str) -> bool:
async def keys_prefix(self, key: bytes) -> list[list[int]]:
""" """
async def lock(self, name: str) -> None:
async def lock(self, name: bytes) -> None:
"""
Lock acquires a distributed shared lock on a given named lock.
On success, it will return a unique key that exists so long as the
Expand All @@ -202,7 +194,7 @@ class Communicator:
lock ownership. The lock is held until Unlock is called on the key or the
lease associate with the owner expires.
"""
async def unlock(self, key: str) -> None:
async def unlock(self, name: bytes) -> None:
"""
Unlock takes a key returned by Lock and releases the hold on lock. The
next Lock caller waiting for the lock will then be woken up and given
Expand All @@ -225,7 +217,7 @@ class Communicator:
"""
def watch(
self,
key: str,
key: bytes,
*,
once: Optional[bool] = False,
ready_event: Optional["CondVar"] = None,
Expand All @@ -238,7 +230,7 @@ class Communicator:
"""
def watch_prefix(
self,
key: str,
key: bytes,
*,
once: Optional[bool] = False,
ready_event: Optional["CondVar"] = None,
Expand All @@ -261,16 +253,16 @@ class Watch:
class WatchEvent:
""" """

key: str
value: str
key: bytes
value: bytes
event: "WatchEventType"
prev_value: Optional[str]
prev_value: Optional[bytes]

def __init__(
key: str,
value: str,
key: bytes,
value: bytes,
event: "WatchEventType",
prev_value: Optional[str] = None,
prev_value: Optional[bytes] = None,
) -> None: ...

class WatchEventType:
Expand All @@ -296,7 +288,7 @@ class CondVar:
class ClientError(Exception):
""" """

class GRpcStatusError(ClientError):
class GRPCStatusError(ClientError):
""" """

class InvalidArgsError(ClientError):
Expand Down Expand Up @@ -332,7 +324,7 @@ class EndpointError(ClientError):
class LockError(ClientError):
""" """

class GRpcStatusCode(Enum):
class GRPCStatusCode(Enum):
Ok = 0
"""The operation completed successfully."""

Expand Down
100 changes: 34 additions & 66 deletions src/communicator.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use etcd_client::{Client as EtcdClient, PutOptions};
use etcd_client::Client as EtcdClient;
use etcd_client::{DeleteOptions, GetOptions, WatchOptions};
use pyo3::exceptions::PyException;
use pyo3::prelude::*;
use pyo3::types::PyBytes;
use pyo3_asyncio::tokio::future_into_py;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;

Expand All @@ -19,16 +18,17 @@ pub struct PyCommunicator(pub Arc<Mutex<EtcdClient>>);
#[pymethods]
impl PyCommunicator {
// TODO: Implement and use the CRUD response types
fn get<'a>(&'a self, py: Python<'a>, key: String) -> PyResult<&'a PyAny> {
fn get<'a>(&'a self, py: Python<'a>, key: &PyBytes) -> PyResult<&'a PyAny> {
let client = self.0.clone();
let key = key.as_bytes().to_vec();
future_into_py(py, async move {
let mut client = client.lock().await;
let result = client.get(key, None).await;
result
.map(|response| {
let kvs = response.kvs();
if !kvs.is_empty() {
Some(String::from_utf8(kvs[0].value().to_owned()).unwrap())
Some(kvs[0].value().to_owned())
} else {
None
}
Expand All @@ -37,53 +37,43 @@ impl PyCommunicator {
})
}

fn get_prefix<'a>(&'a self, py: Python<'a>, prefix: String) -> PyResult<&'a PyAny> {
fn get_prefix<'a>(&'a self, py: Python<'a>, prefix: &PyBytes) -> PyResult<&'a PyAny> {
let client = self.0.clone();
let prefix = prefix.as_bytes().to_vec();

future_into_py(py, async move {
let mut client = client.lock().await;
let options = GetOptions::new().with_prefix();
let result = client.get(prefix, Some(options)).await;
result
.map(|response| {
let mut result = HashMap::new();
let mut list = vec![];
let kvs = response.kvs();
for kv in kvs {
let key = String::from_utf8(kv.key().to_owned()).unwrap();
let value = String::from_utf8(kv.value().to_owned()).unwrap();
result.insert(key, value);
list.push((kv.key().to_owned(), kv.value().to_owned()));
}
result
list
})
.map_err(|e| PyClientError(e).into())
})
}

fn put<'a>(&'a self, py: Python<'a>, key: String, value: String) -> PyResult<&'a PyAny> {
fn put<'a>(&'a self, py: Python<'a>, key: &PyBytes, value: &PyBytes) -> PyResult<&'a PyAny> {
let client = self.0.clone();
let key = key.as_bytes().to_vec();
let value = value.as_bytes().to_vec();

future_into_py(py, async move {
let mut client = client.lock().await;
let result = client.put(key, value, None).await;
result.map(|_| ()).map_err(|e| PyClientError(e).into())
})
}

fn put_prefix<'a>(
&'a self,
py: Python<'a>,
prefix: String,
value: String,
) -> PyResult<&'a PyAny> {
fn delete<'a>(&'a self, py: Python<'a>, key: &PyBytes) -> PyResult<&'a PyAny> {
let client = self.0.clone();
future_into_py(py, async move {
let mut client = client.lock().await;
let options = PutOptions::new().with_prev_key();
let result = client.put(prefix, value, Some(options)).await;
result.map(|_| ()).map_err(|e| PyClientError(e).into())
})
}
let key = key.as_bytes().to_vec();

fn delete<'a>(&'a self, py: Python<'a>, key: String) -> PyResult<&'a PyAny> {
let client = self.0.clone();
future_into_py(py, async move {
let mut client = client.lock().await;

Expand All @@ -92,8 +82,10 @@ impl PyCommunicator {
})
}

fn delete_prefix<'a>(&'a self, py: Python<'a>, key: String) -> PyResult<&'a PyAny> {
fn delete_prefix<'a>(&'a self, py: Python<'a>, key: &PyBytes) -> PyResult<&'a PyAny> {
let client = self.0.clone();
let key = key.as_bytes().to_vec();

future_into_py(py, async move {
let mut client = client.lock().await;
let options = DeleteOptions::new().with_prefix();
Expand All @@ -114,39 +106,10 @@ impl PyCommunicator {
})
}

fn replace<'a>(
&'a self,
py: Python<'a>,
key: String,
initial_val: String,
new_val: String,
) -> PyResult<&'a PyAny> {
fn keys_prefix<'a>(&'a self, py: Python<'a>, key: &PyBytes) -> PyResult<&'a PyAny> {
let client = self.0.clone();
future_into_py(py, async move {
let mut client = client.lock().await;
match client.get(key.clone(), None).await {
Ok(response) => {
if let Some(key_value) = response.kvs().get(0) {
if *key_value.value_str().unwrap() == initial_val {
match client.put(key, new_val, None).await {
Ok(_) => Ok(true), // replace successful
Err(e) => Err(PyClientError(e)),
}
} else {
Ok(false) // initial_val not equal to current value
}
} else {
Ok(false) // Key does not exist
}
}
Err(e) => Err(PyClientError(e)),
}
.map_err(|e| PyErr::new::<PyException, _>(format!("{}", e.0)))
})
}
let key = key.as_bytes().to_vec();

fn keys_prefix<'a>(&'a self, py: Python<'a>, key: String) -> PyResult<&'a PyAny> {
let client = self.0.clone();
future_into_py(py, async move {
let mut client = client.lock().await;
let options = GetOptions::new().with_prefix();
Expand All @@ -156,29 +119,32 @@ impl PyCommunicator {
let mut result = Vec::new();
let kvs = response.kvs();
for kv in kvs {
let key = String::from_utf8(kv.key().to_owned()).unwrap();
result.push(key);
result.push(kv.key().to_owned());
}
result
})
.map_err(|e| PyClientError(e).into())
})
}

fn lock<'a>(&'a self, py: Python<'a>, name: String) -> PyResult<&'a PyAny> {
fn lock<'a>(&'a self, py: Python<'a>, name: &PyBytes) -> PyResult<&'a PyAny> {
let client = self.0.clone();
let name = name.as_bytes().to_vec();

future_into_py(py, async move {
let mut client = client.lock().await;
let result = client.lock(name, None).await;
result.map(|_| ()).map_err(|e| PyClientError(e).into())
})
}

fn unlock<'a>(&'a self, py: Python<'a>, key: String) -> PyResult<&'a PyAny> {
fn unlock<'a>(&'a self, py: Python<'a>, name: &PyBytes) -> PyResult<&'a PyAny> {
let client = self.0.clone();
let name = name.as_bytes().to_vec();

future_into_py(py, async move {
let mut client = client.lock().await;
let result = client.unlock(key).await;
let result = client.unlock(name).await;
result.map(|_| ()).map_err(|e| PyClientError(e).into())
})
}
Expand Down Expand Up @@ -222,24 +188,26 @@ impl PyCommunicator {

fn watch(
&self,
key: String,
key: &PyBytes,
once: Option<bool>,
ready_event: Option<PyCondVar>,
cleanup_event: Option<PyCondVar>,
) -> PyWatch {
let client = self.0.clone();
let key = key.as_bytes().to_vec();
let once = once.unwrap_or(false);
PyWatch::new(client, key, once, None, ready_event, cleanup_event)
}

fn watch_prefix(
&self,
key: String,
key: &PyBytes,
once: Option<bool>,
ready_event: Option<PyCondVar>,
cleanup_event: Option<PyCondVar>,
) -> PyWatch {
let client = self.0.clone();
let key = key.as_bytes().to_vec();
let once = once.unwrap_or(false);
let options = WatchOptions::new().with_prefix();
PyWatch::new(client, key, once, Some(options), ready_event, cleanup_event)
Expand Down
Loading
Loading