Skip to content

Commit

Permalink
Update harness
Browse files Browse the repository at this point in the history
  • Loading branch information
jopemachine committed Apr 9, 2024
1 parent b19e7f9 commit a6753bb
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 23 deletions.
5 changes: 3 additions & 2 deletions src/communicator.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use etcd_client::{Client as EtcdClient, PutOptions};
use etcd_client::Client as EtcdClient;
use etcd_client::{DeleteOptions, GetOptions, WatchOptions};
use pyo3::prelude::*;
use pyo3::types::PyBytes;
Expand All @@ -18,8 +18,9 @@ pub struct PyCommunicator(pub Arc<Mutex<EtcdClient>>);
#[pymethods]
impl PyCommunicator {
// TODO: Implement and use the CRUD response types
fn get<'a>(&'a self, py: Python<'a>, key: String) -> PyResult<&'a PyAny> {
fn get<'a>(&'a self, py: Python<'a>, key: &PyBytes) -> PyResult<&'a PyAny> {
let client = self.0.clone();
let key = key.as_bytes().to_vec();
future_into_py(py, async move {
let mut client = client.lock().await;
let result = client.get(key, None).await;
Expand Down
2 changes: 0 additions & 2 deletions src/lock_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ pub struct EtcdLockManager {

impl EtcdLockManager {
pub fn new(client: PyClient, lock_opt: PyEtcdLockOption) -> Self {
let lock_name = lock_opt.lock_name.clone();

Self {
client,
lock_name: lock_opt.lock_name,
Expand Down
34 changes: 15 additions & 19 deletions tests/harness.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
using callbacks in separate threads.
"""

from __future__ import annotations

import asyncio
from dataclasses import dataclass
import enum
Expand Down Expand Up @@ -70,7 +68,7 @@ def __str__(self):
return f"{self.host}:{self.port}"

@classmethod
def parse(cls, s: str) -> HostPortPair:
def parse(cls, s: str) -> "HostPortPair":
if ":" in s:
host, port_str = s.rsplit(":")
port = int(port_str)
Expand Down Expand Up @@ -226,7 +224,7 @@ async def put(
scope_prefix = self._merge_scope_prefix_map(scope_prefix_map)[scope]
mangled_key = self._mangle_key(f"{_slash(scope_prefix)}{key}")
async with self.etcd.connect() as communicator:
await communicator.put(mangled_key, str(val))
await communicator.put(mangled_key.encode(self.encoding), str(val).encode(self.encoding))

async def put_prefix(
self,
Expand Down Expand Up @@ -266,7 +264,7 @@ def _flatten(prefix: str, inner_dict: NestedStrKeyedDict) -> None:
actions = []
for k, v in flattened_dict.items():
actions.append(
TxnOp.put(self._mangle_key(f"{_slash(scope_prefix)}{k}"), str(v))
TxnOp.put(self._mangle_key(f"{_slash(scope_prefix)}{k}").encode(self.encoding), str(v).encode(self.encoding))
)

await communicator.txn(
Expand Down Expand Up @@ -295,7 +293,7 @@ async def put_dict(
actions = []
for k, v in flattened_dict_obj.items():
actions.append(
TxnOp.put(self._mangle_key(f"{_slash(scope_prefix)}{k}"), str(v))
TxnOp.put(self._mangle_key(f"{_slash(scope_prefix)}{k}").encode(self.encoding), str(v).encode(self.encoding))
)

async with self.etcd.connect() as communicator:
Expand Down Expand Up @@ -343,11 +341,10 @@ async def get(
async with self.etcd.connect() as communicator:
for scope_prefix in scope_prefixes:
value = await communicator.get(
self._mangle_key(f"{_slash(scope_prefix)}{key}")
self._mangle_key(f"{_slash(scope_prefix)}{key}").encode(self.encoding)
)
if value is not None:
value = bytes(value).decode(self.encoding)
return value
return bytes(value).decode(self.encoding)
return None

async def get_prefix(
Expand Down Expand Up @@ -416,9 +413,8 @@ async def get_prefix(
mangled_key_prefix = self._mangle_key(
f"{_slash(scope_prefix)}{key_prefix}"
)
values = await communicator.get_prefix(mangled_key_prefix)
values = await communicator.get_prefix(mangled_key_prefix.encode(self.encoding))
pair_sets.append(
# [(self._demangle_key(bytes(k)), bytes(v).decode(self.encoding)) for k, v in values]
[(self._demangle_key(bytes(k).decode(self.encoding)), bytes(v).decode(self.encoding)) for k, v in values]
)

Expand Down Expand Up @@ -448,10 +444,10 @@ async def replace(
EtcdTransactionAction()
.when(
[
Compare.value(mangled_key, CompareOp.EQUAL, initial_val),
Compare.value(mangled_key.encode(self.encoding), CompareOp.EQUAL, initial_val.encode(self.encoding)),
]
)
.and_then([TxnOp.put(mangled_key, new_val)])
.and_then([TxnOp.put(mangled_key.encode(self.encoding), new_val.encode(self.encoding))])
.or_else([])
)

Expand All @@ -467,7 +463,7 @@ async def delete(
scope_prefix = self._merge_scope_prefix_map(scope_prefix_map)[scope]
mangled_key = self._mangle_key(f"{_slash(scope_prefix)}{key}")
async with self.etcd.connect() as communicator:
await communicator.delete(mangled_key)
await communicator.delete(mangled_key.encode(self.encoding))

async def delete_multi(
self,
Expand All @@ -481,7 +477,7 @@ async def delete_multi(
actions = []
for k in keys:
actions.append(
TxnOp.delete(self._mangle_key(f"{_slash(scope_prefix)}{k}"))
TxnOp.delete(self._mangle_key(f"{_slash(scope_prefix)}{k}").encode(self.encoding))
)
communicator.txn(EtcdTransactionAction().and_then(actions).or_else([]))

Expand All @@ -495,7 +491,7 @@ async def delete_prefix(
scope_prefix = self._merge_scope_prefix_map(scope_prefix_map)[scope]
mangled_key_prefix = self._mangle_key(f"{_slash(scope_prefix)}{key_prefix}")
async with self.etcd.connect() as communicator:
await communicator.delete_prefix(mangled_key_prefix)
await communicator.delete_prefix(mangled_key_prefix.encode(self.encoding))

async def _watch_impl(
self,
Expand All @@ -516,7 +512,7 @@ async def _watch_impl(
)
except asyncio.TimeoutError:
pass
yield Event(ev.key[scope_prefix_len:], ev.event, ev.value)
yield Event(bytes(ev.key).decode(self.encoding)[scope_prefix_len:], ev.event, bytes(ev.value).decode(self.encoding))
if once:
return
finally:
Expand All @@ -543,7 +539,7 @@ async def watch(
try:
async for ev in self._watch_impl(
lambda communicator: communicator.watch(
mangled_key,
mangled_key.encode(self.encoding),
ready_event=ready_event,
),
scope_prefix_len,
Expand Down Expand Up @@ -585,7 +581,7 @@ async def watch_prefix(
try:
async for ev in self._watch_impl(
lambda communicator: communicator.watch_prefix(
mangled_key_prefix,
mangled_key_prefix.encode(self.encoding),
ready_event=ready_event,
),
scope_prefix_len,
Expand Down

0 comments on commit a6753bb

Please sign in to comment.