diff --git a/contrib/query.py b/contrib/query.py index c27ddbd3a..2c96a4de4 100755 --- a/contrib/query.py +++ b/contrib/query.py @@ -72,7 +72,7 @@ async def query(args): if not hashX: continue n = None - history = await db.limited_history(hashX, limit=limit) + history = await db.limited_history(hashX=hashX, limit=limit) for n, (tx_hash, height) in enumerate(history, start=1): print(f'History #{n:,d}: height {height:,d} ' f'tx_hash {hash_to_hex_str(tx_hash)}') diff --git a/docs/HOWTO.rst b/docs/HOWTO.rst index b1e8a8bd0..79e04d5a4 100644 --- a/docs/HOWTO.rst +++ b/docs/HOWTO.rst @@ -29,17 +29,20 @@ functions. For example, `x11_hash`_ is required for DASH. Scrypt coins require a Python interpreter compiled and/or linked with OpenSSL 1.1.0 or higher. -You **must** be running a non-pruning bitcoin daemon with:: +You **must** be running a non-pruning bitcoin daemon. +It is also recommended to have:: txindex=1 -set in its configuration file. If you have an existing installation -of bitcoind and have not previously set this you will need to reindex -the blockchain with:: +set in its configuration file, for better performance when serving many sessions. +If you have an existing installation of bitcoind and have not previously set this +but you do now, you will need to reindex the blockchain with:: bitcoind -reindex which can take some time. +If you intend to use a bitcoind without txindex enabled, you need to configure the +`DAEMON_HAS_TXINDEX` :ref:`environment variable `. While not a requirement for running ElectrumX, it is intended to be run with supervisor software such as Daniel Bernstein's diff --git a/docs/environment.rst b/docs/environment.rst index 4139e50c1..8d1552e13 100644 --- a/docs/environment.rst +++ b/docs/environment.rst @@ -281,13 +281,14 @@ These environment variables are optional: version string. For example to drop versions from 1.0 to 1.2 use the regex ``1\.[0-2]\.\d+``. -.. envvar:: DROP_CLIENT_UNKNOWN +.. envvar:: DAEMON_HAS_TXINDEX - Set to anything non-empty to deny serving clients which do not - identify themselves first by issuing the server.version method - call with a non-empty client identifier. The connection is dropped - on first actual method call. This might help to filter out simple - robots. This behavior is off by default. + Set this environment variable to empty if the connected bitcoind + does not have txindex enabled. Defaults to True (has txindex). + We rely on bitcoind to lookup arbitrary txs, regardless of this setting. + Without txindex, tx lookup works as in `bitcoind PR #10275`_. + Note that performance when serving many sessions is better with txindex + (but initial sync should be unaffected). Resource Usage Limits @@ -506,3 +507,4 @@ your available physical RAM: .. _lib/coins.py: https://github.com/spesmilo/electrumx/blob/master/electrumx/lib/coins.py .. _uvloop: https://pypi.python.org/pypi/uvloop +.. _bitcoind PR #10275: https://github.com/bitcoin/bitcoin/pull/10275 diff --git a/electrumx/lib/tx.py b/electrumx/lib/tx.py index faf210bc2..6fe7b617d 100644 --- a/electrumx/lib/tx.py +++ b/electrumx/lib/tx.py @@ -29,7 +29,7 @@ from dataclasses import dataclass from hashlib import blake2s -from typing import Sequence +from typing import Sequence, Optional from electrumx.lib.hash import sha256, double_sha256, hash_to_hex_str from electrumx.lib.script import OpCodes @@ -105,6 +105,13 @@ def serialize(self): )) +@dataclass +class TXOSpendStatus: + prev_height: Optional[int] # block height TXO is mined at. None if the outpoint never existed + spender_txhash: bytes = None + spender_height: int = None + + class Deserializer: '''Deserializes blocks into transactions. diff --git a/electrumx/lib/util.py b/electrumx/lib/util.py index 82a7daf58..5789b7e6c 100644 --- a/electrumx/lib/util.py +++ b/electrumx/lib/util.py @@ -34,6 +34,7 @@ import sys from collections.abc import Container, Mapping from struct import Struct +from typing import Optional # Use system-compiled JSON lib if available, fallback to stdlib @@ -162,7 +163,7 @@ def chunks(items, size): yield items[i: i + size] -def resolve_limit(limit): +def resolve_limit(limit: Optional[int]) -> int: if limit is None or limit < 0: return -1 assert isinstance(limit, int) @@ -316,6 +317,7 @@ def protocol_version(client_req, min_tuple, max_tuple): struct_le_Q = Struct('H') struct_be_I = Struct('>I') +struct_be_Q = Struct('>Q') structB = Struct('B') unpack_le_int32_from = struct_le_i.unpack_from @@ -329,6 +331,7 @@ def protocol_version(client_req, min_tuple, max_tuple): unpack_le_uint32 = struct_le_I.unpack unpack_le_uint64 = struct_le_Q.unpack unpack_be_uint32 = struct_be_I.unpack +unpack_be_uint64 = struct_be_Q.unpack pack_le_int32 = struct_le_i.pack pack_le_int64 = struct_le_q.pack @@ -337,6 +340,7 @@ def protocol_version(client_req, min_tuple, max_tuple): pack_le_uint64 = struct_le_Q.pack pack_be_uint16 = struct_be_H.pack pack_be_uint32 = struct_be_I.pack +pack_be_uint64 = struct_be_Q.pack pack_byte = structB.pack hex_to_bytes = bytes.fromhex diff --git a/electrumx/server/block_processor.py b/electrumx/server/block_processor.py index dd4b9a590..f6a1faee6 100644 --- a/electrumx/server/block_processor.py +++ b/electrumx/server/block_processor.py @@ -11,7 +11,7 @@ import asyncio import time -from typing import Sequence, Tuple, List, Callable, Optional, TYPE_CHECKING, Type +from typing import Sequence, Tuple, List, Callable, Optional, TYPE_CHECKING, Type, Set from aiorpcx import TaskGroup, run_in_thread, CancelledError @@ -23,8 +23,8 @@ chunks, class_logger, pack_le_uint32, pack_le_uint64, unpack_le_uint64 ) from electrumx.lib.tx import Tx -from electrumx.server.db import FlushData, COMP_TXID_LEN, DB -from electrumx.server.history import TXNUM_LEN +from electrumx.server.db import FlushData, DB +from electrumx.server.history import TXNUM_LEN, TXOUTIDX_LEN, TXOUTIDX_PADDING, pack_txnum if TYPE_CHECKING: from electrumx.lib.coins import Coin @@ -176,7 +176,8 @@ def __init__(self, env: 'Env', db: DB, daemon: Daemon, notifications: 'Notificat # Meta self.next_cache_check = 0 - self.touched = set() + self.touched_hashxs = set() # type: Set[bytes] + self.touched_outpoints = set() # type: Set[Tuple[bytes, int]] self.reorg_count = 0 self.height = -1 self.tip = None # type: Optional[bytes] @@ -186,7 +187,9 @@ def __init__(self, env: 'Env', db: DB, daemon: Daemon, notifications: 'Notificat # Caches of unflushed items. self.headers = [] - self.tx_hashes = [] + self.tx_hashes = [] # type: List[bytes] + self.undo_tx_hashes = [] # type: List[bytes] + self.undo_historical_spends = [] # type: List[bytes] self.undo_infos = [] # type: List[Tuple[Sequence[bytes], int]] # UTXO cache @@ -234,8 +237,13 @@ async def check_and_advance_blocks(self, raw_blocks): self.logger.info(f'processed {len(blocks):,d} block{s} size {blocks_size:.2f} MB ' f'in {time.monotonic() - start:.1f}s') if self._caught_up_event.is_set(): - await self.notifications.on_block(self.touched, self.height) - self.touched = set() + await self.notifications.on_block( + touched_hashxs=self.touched_hashxs, + touched_outpoints=self.touched_outpoints, + height=self.height, + ) + self.touched_hashxs = set() + self.touched_outpoints = set() elif hprevs[0] != chain[0]: await self.reorg_chain() else: @@ -269,10 +277,10 @@ async def get_raw_blocks(last_height, hex_hashes) -> Sequence[bytes]: return await self.daemon.raw_blocks(hex_hashes) def flush_backup(): - # self.touched can include other addresses which is + # self.touched_hashxs can include other addresses which is # harmless, but remove None. - self.touched.discard(None) - self.db.flush_backup(self.flush_data(), self.touched) + self.touched_hashxs.discard(None) + self.db.flush_backup(self.flush_data(), self.touched_hashxs) _start, last, hashes = await self.reorg_hashes(count) # Reverse and convert to hex strings. @@ -347,9 +355,18 @@ def estimate_txs_remaining(self): def flush_data(self): '''The data for a flush. The lock must be taken.''' assert self.state_lock.locked() - return FlushData(self.height, self.tx_count, self.headers, - self.tx_hashes, self.undo_infos, self.utxo_cache, - self.db_deletes, self.tip) + return FlushData( + height=self.height, + tx_count=self.tx_count, + headers=self.headers, + block_tx_hashes=self.tx_hashes, + undo_block_tx_hashes=self.undo_tx_hashes, + undo_historical_spends=self.undo_historical_spends, + undo_infos=self.undo_infos, + adds=self.utxo_cache, + deletes=self.db_deletes, + tip=self.tip, + ) async def flush(self, flush_utxos): def flush(): @@ -368,7 +385,7 @@ async def _maybe_flush(self): await self.flush(flush_arg) self.next_cache_check = time.monotonic() + 30 - def check_cache_size(self): + def check_cache_size(self) -> Optional[bool]: '''Flush a cache if it gets too big.''' # Good average estimates based on traversal of subobjects and # requesting size from Python (see deep_getsizeof). @@ -423,8 +440,6 @@ def advance_txs( txs: Sequence[Tuple[Tx, bytes]], is_unspendable: Callable[[bytes], bool], ) -> Sequence[bytes]: - self.tx_hashes.append(b''.join(tx_hash for tx, tx_hash in txs)) - # Use local vars for speed in the loops undo_info = [] tx_num = self.tx_count @@ -432,16 +447,22 @@ def advance_txs( put_utxo = self.utxo_cache.__setitem__ spend_utxo = self.spend_utxo undo_info_append = undo_info.append - update_touched = self.touched.update + update_touched_hashxs = self.touched_hashxs.update + add_touched_outpoint = self.touched_outpoints.add hashXs_by_tx = [] append_hashXs = hashXs_by_tx.append + txhash_to_txnum_map = {} + put_txhash_to_txnum_map = txhash_to_txnum_map.__setitem__ + txo_to_spender_map = {} + put_txo_to_spender_map = txo_to_spender_map.__setitem__ to_le_uint32 = pack_le_uint32 to_le_uint64 = pack_le_uint64 + _pack_txnum = pack_txnum for tx, tx_hash in txs: hashXs = [] append_hashX = hashXs.append - tx_numb = to_le_uint64(tx_num)[:TXNUM_LEN] + tx_numb = _pack_txnum(tx_num) # Spend the inputs for txin in tx.inputs: @@ -450,6 +471,9 @@ def advance_txs( cache_value = spend_utxo(txin.prev_hash, txin.prev_idx) undo_info_append(cache_value) append_hashX(cache_value[:HASHX_LEN]) + prevout_tuple = (txin.prev_hash, txin.prev_idx) + put_txo_to_spender_map(prevout_tuple, tx_hash) + add_touched_outpoint(prevout_tuple) # Add the new UTXOs for idx, txout in enumerate(tx.outputs): @@ -460,14 +484,22 @@ def advance_txs( # Get the hashX hashX = script_hashX(txout.pk_script) append_hashX(hashX) - put_utxo(tx_hash + to_le_uint32(idx), + put_utxo(tx_hash + to_le_uint32(idx)[:TXOUTIDX_LEN], hashX + tx_numb + to_le_uint64(txout.value)) + add_touched_outpoint((tx_hash, idx)) append_hashXs(hashXs) - update_touched(hashXs) + update_touched_hashxs(hashXs) + put_txhash_to_txnum_map(tx_hash, tx_num) tx_num += 1 - self.db.history.add_unflushed(hashXs_by_tx, self.tx_count) + self.tx_hashes.append(b''.join(tx_hash for tx, tx_hash in txs)) + self.db.history.add_unflushed( + hashXs_by_tx=hashXs_by_tx, + first_tx_num=self.tx_count, + txhash_to_txnum_map=txhash_to_txnum_map, + txo_to_spender_map=txo_to_spender_map, + ) self.tx_count = tx_num self.db.tx_counts.append(tx_num) @@ -519,7 +551,9 @@ def backup_txs( # Use local vars for speed in the loops put_utxo = self.utxo_cache.__setitem__ spend_utxo = self.spend_utxo - touched = self.touched + add_touched_hashx = self.touched_hashxs.add + add_touched_outpoint = self.touched_outpoints.add + undo_hist_spend = self.undo_historical_spends.append undo_entry_len = HASHX_LEN + TXNUM_LEN + 8 for tx, tx_hash in reversed(txs): @@ -532,7 +566,8 @@ def backup_txs( # Get the hashX cache_value = spend_utxo(tx_hash, idx) hashX = cache_value[:HASHX_LEN] - touched.add(hashX) + add_touched_hashx(hashX) + add_touched_outpoint((tx_hash, idx)) # Restore the inputs for txin in reversed(tx.inputs): @@ -540,9 +575,14 @@ def backup_txs( continue n -= undo_entry_len undo_item = undo_info[n:n + undo_entry_len] - put_utxo(txin.prev_hash + pack_le_uint32(txin.prev_idx), undo_item) + prevout = txin.prev_hash + pack_le_uint32(txin.prev_idx)[:TXOUTIDX_LEN] + put_utxo(prevout, undo_item) hashX = undo_item[:HASHX_LEN] - touched.add(hashX) + add_touched_hashx(hashX) + add_touched_outpoint((txin.prev_hash, txin.prev_idx)) + undo_hist_spend(prevout) + + self.undo_tx_hashes.append(b''.join(tx_hash for tx, tx_hash in txs)) assert n == 0 self.tx_count -= len(txs) @@ -587,21 +627,14 @@ def backup_txs( To this end we maintain two "tables", one for each point above: - 1. Key: b'u' + address_hashX + tx_idx + tx_num + 1. Key: b'u' + address_hashX + tx_num + txout_idx Value: the UTXO value as a 64-bit unsigned integer - 2. Key: b'h' + compressed_tx_hash + tx_idx + tx_num + 2. Key: b'h' + tx_num + txout_idx Value: hashX - - The compressed tx hash is just the first few bytes of the hash of - the tx in which the UTXO was created. As this is not unique there - will be potential collisions so tx_num is also in the key. When - looking up a UTXO the prefix space of the compressed hash needs to - be searched and resolved if necessary with the tx_num. The - collision rate is low (<0.1%). ''' - def spend_utxo(self, tx_hash: bytes, tx_idx: int) -> bytes: + def spend_utxo(self, tx_hash: bytes, txout_idx: int) -> bytes: '''Spend a UTXO and return (hashX + tx_num + value_sats). If the UTXO is not in the cache it must be on disk. We store @@ -609,42 +642,36 @@ def spend_utxo(self, tx_hash: bytes, tx_idx: int) -> bytes: corruption. ''' # Fast track is it being in the cache - idx_packed = pack_le_uint32(tx_idx) + idx_packed = pack_le_uint32(txout_idx)[:TXOUTIDX_LEN] cache_value = self.utxo_cache.pop(tx_hash + idx_packed, None) if cache_value: return cache_value # Spend it from the DB. - txnum_padding = bytes(8-TXNUM_LEN) + tx_num = self.db.fs_txnum_for_txhash(tx_hash) + if tx_num is None: + raise ChainError(f'UTXO {hash_to_hex_str(tx_hash)} / {txout_idx:,d} has ' + f'no corresponding tx_num in DB') + tx_numb = pack_txnum(tx_num) - # Key: b'h' + compressed_tx_hash + tx_idx + tx_num + # Key: b'h' + tx_num + txout_idx # Value: hashX - prefix = b'h' + tx_hash[:COMP_TXID_LEN] + idx_packed - candidates = {db_key: hashX for db_key, hashX - in self.db.utxo_db.iterator(prefix=prefix)} - - for hdb_key, hashX in candidates.items(): - tx_num_packed = hdb_key[-TXNUM_LEN:] - - if len(candidates) > 1: - tx_num, = unpack_le_uint64(tx_num_packed + txnum_padding) - hash, _height = self.db.fs_tx_hash(tx_num) - if hash != tx_hash: - assert hash is not None # Should always be found - continue - - # Key: b'u' + address_hashX + tx_idx + tx_num - # Value: the UTXO value as a 64-bit unsigned integer - udb_key = b'u' + hashX + hdb_key[-4-TXNUM_LEN:] - utxo_value_packed = self.db.utxo_db.get(udb_key) - if utxo_value_packed: - # Remove both entries for this UTXO - self.db_deletes.append(hdb_key) - self.db_deletes.append(udb_key) - return hashX + tx_num_packed + utxo_value_packed - - raise ChainError(f'UTXO {hash_to_hex_str(tx_hash)} / {tx_idx:,d} not ' - f'found in "h" table') + hdb_key = b'h' + tx_numb + idx_packed + hashX = self.db.utxo_db.get(hdb_key) + if hashX is None: + raise ChainError(f'UTXO {hash_to_hex_str(tx_hash)} / {txout_idx:,d} not ' + f'found in "h" table') + # Key: b'u' + address_hashX + tx_num + txout_idx + # Value: the UTXO value as a 64-bit unsigned integer + udb_key = b'u' + hashX + tx_numb + idx_packed + utxo_value_packed = self.db.utxo_db.get(udb_key) + if utxo_value_packed is None: + raise ChainError(f'UTXO {hash_to_hex_str(tx_hash)} / {txout_idx:,d} not ' + f'found in "u" table') + # Remove both entries for this UTXO + self.db_deletes.append(hdb_key) + self.db_deletes.append(udb_key) + return hashX + tx_numb + utxo_value_packed async def _process_prefetched_blocks(self): '''Loop forever processing blocks as they arrive.''' @@ -735,7 +762,7 @@ def advance_txs(self, txs, is_unspendable): tx_num = self.tx_count - len(txs) script_name_hashX = self.coin.name_hashX_from_script - update_touched = self.touched.update + update_touched_hashxs = self.touched_hashxs.update hashXs_by_tx = [] append_hashXs = hashXs_by_tx.append @@ -751,10 +778,15 @@ def advance_txs(self, txs, is_unspendable): append_hashX(hashX) append_hashXs(hashXs) - update_touched(hashXs) + update_touched_hashxs(hashXs) tx_num += 1 - self.db.history.add_unflushed(hashXs_by_tx, self.tx_count - len(txs)) + self.db.history.add_unflushed( + hashXs_by_tx=hashXs_by_tx, + first_tx_num=self.tx_count - len(txs), + txhash_to_txnum_map={}, + txo_to_spender_map={}, + ) return result @@ -762,8 +794,6 @@ def advance_txs(self, txs, is_unspendable): class LTORBlockProcessor(BlockProcessor): def advance_txs(self, txs, is_unspendable): - self.tx_hashes.append(b''.join(tx_hash for tx, tx_hash in txs)) - # Use local vars for speed in the loops undo_info = [] tx_num = self.tx_count @@ -771,16 +801,22 @@ def advance_txs(self, txs, is_unspendable): put_utxo = self.utxo_cache.__setitem__ spend_utxo = self.spend_utxo undo_info_append = undo_info.append - update_touched = self.touched.update + update_touched_hashxs = self.touched_hashxs.update + add_touched_outpoint = self.touched_outpoints.add + txhash_to_txnum_map = {} + put_txhash_to_txnum_map = txhash_to_txnum_map.__setitem__ + txo_to_spender_map = {} + put_txo_to_spender_map = txo_to_spender_map.__setitem__ to_le_uint32 = pack_le_uint32 to_le_uint64 = pack_le_uint64 + _pack_txnum = pack_txnum hashXs_by_tx = [set() for _ in txs] # Add the new UTXOs for (tx, tx_hash), hashXs in zip(txs, hashXs_by_tx): add_hashXs = hashXs.add - tx_numb = to_le_uint64(tx_num)[:TXNUM_LEN] + tx_numb = _pack_txnum(tx_num) for idx, txout in enumerate(tx.outputs): # Ignore unspendable outputs @@ -790,8 +826,10 @@ def advance_txs(self, txs, is_unspendable): # Get the hashX hashX = script_hashX(txout.pk_script) add_hashXs(hashX) - put_utxo(tx_hash + to_le_uint32(idx), + put_utxo(tx_hash + to_le_uint32(idx)[:TXOUTIDX_LEN], hashX + tx_numb + to_le_uint64(txout.value)) + add_touched_outpoint((tx_hash, idx)) + put_txhash_to_txnum_map(tx_hash, tx_num) tx_num += 1 # Spend the inputs @@ -804,12 +842,21 @@ def advance_txs(self, txs, is_unspendable): cache_value = spend_utxo(txin.prev_hash, txin.prev_idx) undo_info_append(cache_value) add_hashXs(cache_value[:HASHX_LEN]) + prevout_tuple = (txin.prev_hash, txin.prev_idx) + put_txo_to_spender_map(prevout_tuple, tx_hash) + add_touched_outpoint(prevout_tuple) # Update touched set for notifications for hashXs in hashXs_by_tx: - update_touched(hashXs) + update_touched_hashxs(hashXs) - self.db.history.add_unflushed(hashXs_by_tx, self.tx_count) + self.tx_hashes.append(b''.join(tx_hash for tx, tx_hash in txs)) + self.db.history.add_unflushed( + hashXs_by_tx=hashXs_by_tx, + first_tx_num=self.tx_count, + txhash_to_txnum_map=txhash_to_txnum_map, + txo_to_spender_map=txo_to_spender_map, + ) self.tx_count = tx_num self.db.tx_counts.append(tx_num) @@ -826,7 +873,8 @@ def backup_txs(self, txs, is_unspendable): # Use local vars for speed in the loops put_utxo = self.utxo_cache.__setitem__ spend_utxo = self.spend_utxo - add_touched = self.touched.add + add_touched_hashx = self.touched_hashxs.add + add_touched_outpoint = self.touched_outpoints.add undo_entry_len = HASHX_LEN + TXNUM_LEN + 8 # Restore coins that had been spent @@ -837,8 +885,10 @@ def backup_txs(self, txs, is_unspendable): if txin.is_generation(): continue undo_item = undo_info[n:n + undo_entry_len] - put_utxo(txin.prev_hash + pack_le_uint32(txin.prev_idx), undo_item) - add_touched(undo_item[:HASHX_LEN]) + prevout = txin.prev_hash + pack_le_uint32(txin.prev_idx)[:TXOUTIDX_LEN] + put_utxo(prevout, undo_item) + add_touched_hashx(undo_item[:HASHX_LEN]) + add_touched_outpoint((txin.prev_hash, txin.prev_idx)) n += undo_entry_len assert n == len(undo_info) @@ -854,6 +904,8 @@ def backup_txs(self, txs, is_unspendable): # Get the hashX cache_value = spend_utxo(tx_hash, idx) hashX = cache_value[:HASHX_LEN] - add_touched(hashX) + add_touched_hashx(hashX) + add_touched_outpoint((tx_hash, idx)) + self.undo_tx_hashes.append(b''.join(tx_hash for tx, tx_hash in txs)) self.tx_count -= len(txs) diff --git a/electrumx/server/controller.py b/electrumx/server/controller.py index 032159fcb..1801a6226 100644 --- a/electrumx/server/controller.py +++ b/electrumx/server/controller.py @@ -6,6 +6,7 @@ # and warranty status of this software. from asyncio import Event +from typing import Set, Dict, Tuple from aiorpcx import _version as aiorpcx_version, TaskGroup @@ -31,43 +32,83 @@ class Notifications: # notifications appropriately. def __init__(self): - self._touched_mp = {} - self._touched_bp = {} + self._touched_hashxs_mp = {} # type: Dict[int, Set[bytes]] + self._touched_hashxs_bp = {} # type: Dict[int, Set[bytes]] + self._touched_outpoints_mp = {} # type: Dict[int, Set[Tuple[bytes, int]]] + self._touched_outpoints_bp = {} # type: Dict[int, Set[Tuple[bytes, int]]] self._highest_block = -1 async def _maybe_notify(self): - tmp, tbp = self._touched_mp, self._touched_bp - common = set(tmp).intersection(tbp) - if common: - height = max(common) - elif tmp and max(tmp) == self._highest_block: + th_mp, th_bp = self._touched_hashxs_mp, self._touched_hashxs_bp + # figure out block height + common_heights = set(th_mp).intersection(th_bp) + if common_heights: + height = max(common_heights) + elif th_mp and max(th_mp) == self._highest_block: height = self._highest_block else: # Either we are processing a block and waiting for it to # come in, or we have not yet had a mempool update for the # new block height return - touched = tmp.pop(height) - for old in [h for h in tmp if h <= height]: - del tmp[old] - for old in [h for h in tbp if h <= height]: - touched.update(tbp.pop(old)) - await self.notify(height, touched) - - async def notify(self, height, touched): + # hashXs + touched_hashxs = th_mp.pop(height) + for old in [h for h in th_mp if h <= height]: + del th_mp[old] + for old in [h for h in th_bp if h <= height]: + touched_hashxs.update(th_bp.pop(old)) + # outpoints + to_mp, to_bp = self._touched_outpoints_mp, self._touched_outpoints_bp + touched_outpoints = to_mp.pop(height) + for old in [h for h in to_mp if h <= height]: + del to_mp[old] + for old in [h for h in to_bp if h <= height]: + touched_outpoints.update(to_bp.pop(old)) + + await self.notify( + height=height, + touched_hashxs=touched_hashxs, + touched_outpoints=touched_outpoints, + ) + + async def notify( + self, + *, + touched_hashxs: Set[bytes], + touched_outpoints: Set[Tuple[bytes, int]], + height: int, + ): pass - async def start(self, height, notify_func): + async def start(self, height: int, notify_func): self._highest_block = height self.notify = notify_func - await self.notify(height, set()) - - async def on_mempool(self, touched, height): - self._touched_mp[height] = touched + await self.notify( + height=height, + touched_hashxs=set(), + touched_outpoints=set(), + ) + + async def on_mempool( + self, + *, + touched_hashxs: Set[bytes], + touched_outpoints: Set[Tuple[bytes, int]], + height: int, + ): + self._touched_hashxs_mp[height] = touched_hashxs + self._touched_outpoints_mp[height] = touched_outpoints await self._maybe_notify() - async def on_block(self, touched, height): - self._touched_bp[height] = touched + async def on_block( + self, + *, + touched_hashxs: Set[bytes], + touched_outpoints: Set[Tuple[bytes, int]], + height: int, + ): + self._touched_hashxs_bp[height] = touched_hashxs + self._touched_outpoints_bp[height] = touched_outpoints self._highest_block = height await self._maybe_notify() diff --git a/electrumx/server/daemon.py b/electrumx/server/daemon.py index c58e1884c..eec73bef7 100644 --- a/electrumx/server/daemon.py +++ b/electrumx/server/daemon.py @@ -277,11 +277,16 @@ async def relayfee(self): network_info = await self.getnetworkinfo() return network_info['relayfee'] - async def getrawtransaction(self, hex_hash, verbose=False): + async def getrawtransaction(self, hex_hash, verbose=False, blockhash=None): '''Return the serialized raw transaction with the given hash.''' # Cast to int because some coin daemons are old and require it - return await self._send_single('getrawtransaction', - (hex_hash, int(verbose))) + verbose = int(verbose) + if blockhash is None: + return await self._send_single('getrawtransaction', (hex_hash, verbose)) + else: + # given a blockhash, modern bitcoind can lookup the tx even without txindex: + # https://github.com/bitcoin/bitcoin/pull/10275 + return await self._send_single('getrawtransaction', (hex_hash, verbose, blockhash)) async def getrawtransactions(self, hex_hashes, replace_errs=True): '''Return the serialized raw transactions with the given hashes. diff --git a/electrumx/server/db.py b/electrumx/server/db.py index fae1885d6..deac8e7ff 100644 --- a/electrumx/server/db.py +++ b/electrumx/server/db.py @@ -16,7 +16,7 @@ from bisect import bisect_right from dataclasses import dataclass from glob import glob -from typing import Dict, List, Sequence, Tuple, Optional, TYPE_CHECKING +from typing import Dict, List, Sequence, Tuple, Optional, TYPE_CHECKING, Union import attr from aiorpcx import run_in_thread, sleep @@ -28,8 +28,11 @@ formatted_time, pack_be_uint16, pack_be_uint32, pack_le_uint64, pack_le_uint32, unpack_le_uint32, unpack_be_uint32, unpack_le_uint64 ) +from electrumx.lib.tx import TXOSpendStatus from electrumx.server.storage import db_class, Storage -from electrumx.server.history import History, TXNUM_LEN +from electrumx.server.history import ( + History, TXNUM_LEN, TXOUTIDX_LEN, TXOUTIDX_PADDING, pack_txnum, unpack_txnum, +) if TYPE_CHECKING: from electrumx.server.env import Env @@ -50,7 +53,9 @@ class FlushData: height = attr.ib() tx_count = attr.ib() headers = attr.ib() - block_tx_hashes = attr.ib() + block_tx_hashes = attr.ib() # type: List[bytes] + undo_block_tx_hashes = attr.ib() # type: List[bytes] + undo_historical_spends = attr.ib() # type: List[bytes] # The following are flushed to the UTXO DB if undo_infos is not None undo_infos = attr.ib() # type: List[Tuple[Sequence[bytes], int]] adds = attr.ib() # type: Dict[bytes, bytes] # txid+out_idx -> hashX+tx_num+value_sats @@ -58,9 +63,6 @@ class FlushData: tip = attr.ib() -COMP_TXID_LEN = 4 - - class DB: '''Simple wrapper of the backend database for querying. @@ -68,7 +70,7 @@ class DB: it was shutdown uncleanly. ''' - DB_VERSIONS = (6, 7, 8) + DB_VERSIONS = (9, ) utxo_db: Optional['Storage'] @@ -94,11 +96,11 @@ def __init__(self, env: 'Env'): self.db_class = db_class(self.env.db_engine) self.history = History() - # Key: b'u' + address_hashX + txout_idx + tx_num + # Key: b'u' + address_hashX + tx_num + txout_idx # Value: the UTXO value as a 64-bit unsigned integer (in satoshis) # "at address, at outpoint, there is a UTXO of value v" # --- - # Key: b'h' + compressed_tx_hash + txout_idx + tx_num + # Key: b'h' + tx_num + txout_idx # Value: hashX # "some outpoint created a UTXO at address" # --- @@ -107,13 +109,11 @@ def __init__(self, env: 'Env'): # "undo data: list of UTXOs spent at block height" self.utxo_db = None - self.utxo_flush_count = 0 self.fs_height = -1 self.fs_tx_count = 0 self.db_height = -1 self.db_tx_count = 0 self.db_tip = None # type: Optional[bytes] - self.tx_counts = None self.last_flush = time.time() self.last_flush_tx_count = 0 self.wall_time = 0 @@ -129,6 +129,7 @@ def __init__(self, env: 'Env'): # on-disk: raw block headers in chain order self.headers_file = util.LogicalFile('meta/headers', 2, 16000000) # on-disk: cumulative number of txs at the end of height N + self.tx_counts = None # type: Optional[array] self.tx_counts_file = util.LogicalFile('meta/txcounts', 2, 2000000) # on-disk: 32 byte txids in chain order, allows (tx_num -> txid) map self.hashes_file = util.LogicalFile('meta/hashes', 4, 16000000) @@ -150,7 +151,7 @@ async def _read_tx_counts(self): else: assert self.db_tx_count == 0 - async def _open_dbs(self, for_sync: bool, compacting: bool): + async def _open_dbs(self, *, for_sync: bool): assert self.utxo_db is None # First UTXO DB @@ -169,17 +170,16 @@ async def _open_dbs(self, for_sync: bool, compacting: bool): self.read_utxo_state() # Then history DB - self.utxo_flush_count = self.history.open_db(self.db_class, for_sync, - self.utxo_flush_count, - compacting) + self.history.open_db( + db_class=self.db_class, + for_sync=for_sync, + utxo_db_tx_count=self.db_tx_count, + ) self.clear_excess_undo_info() # Read TX counts (requires meta directory) await self._read_tx_counts() - async def open_for_compacting(self): - await self._open_dbs(True, True) - async def open_for_sync(self): '''Open the databases to sync to the daemon. @@ -187,7 +187,7 @@ async def open_for_sync(self): synchronization. When serving clients we want the open files for serving network connections. ''' - await self._open_dbs(True, False) + await self._open_dbs(for_sync=True) async def open_for_serving(self): '''Open the databases for serving. If they are already open they are @@ -198,7 +198,7 @@ async def open_for_serving(self): self.utxo_db.close() self.history.close_db() self.utxo_db = None - await self._open_dbs(False, False) + await self._open_dbs(for_sync=False) # Header merkle cache @@ -214,19 +214,21 @@ async def header_branch_and_root(self, length, height): return await self.header_mc.branch_and_root(length, height) # Flushing - def assert_flushed(self, flush_data): + def assert_flushed(self, flush_data: FlushData): '''Asserts state is fully flushed.''' assert flush_data.tx_count == self.fs_tx_count == self.db_tx_count assert flush_data.height == self.fs_height == self.db_height assert flush_data.tip == self.db_tip assert not flush_data.headers assert not flush_data.block_tx_hashes + assert not flush_data.undo_block_tx_hashes + assert not flush_data.undo_historical_spends assert not flush_data.adds assert not flush_data.deletes assert not flush_data.undo_infos self.history.assert_flushed() - def flush_dbs(self, flush_data, flush_utxos, estimate_txs_remaining): + def flush_dbs(self, flush_data: FlushData, flush_utxos, estimate_txs_remaining): '''Flush out cached state. History is always flushed; UTXOs are flushed if flush_utxos.''' if flush_data.height == self.db_height: @@ -254,7 +256,7 @@ def flush_dbs(self, flush_data, flush_utxos, estimate_txs_remaining): self.flush_state(self.utxo_db) elapsed = self.last_flush - start_time - self.logger.info(f'flush #{self.history.flush_count:,d} took ' + self.logger.info(f'flush took ' f'{elapsed:.1f}s. Height {flush_data.height:,d} ' f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})') @@ -269,7 +271,7 @@ def flush_dbs(self, flush_data, flush_utxos, estimate_txs_remaining): self.logger.info(f'sync time: {formatted_time(self.wall_time)} ' f'ETA: {formatted_time(eta)}') - def flush_fs(self, flush_data): + def flush_fs(self, flush_data: FlushData): '''Write headers, tx counts and block tx hashes to the filesystem. The first height to write is self.fs_height + 1. The FS @@ -332,11 +334,11 @@ def flush_utxo_db(self, batch, flush_data: FlushData): for key, value in flush_data.adds.items(): # key: txid+out_idx, value: hashX+tx_num+value_sats hashX = value[:HASHX_LEN] - txout_idx = key[-4:] + txout_idx = key[-TXOUTIDX_LEN:] tx_num = value[HASHX_LEN: HASHX_LEN+TXNUM_LEN] value_sats = value[-8:] - suffix = txout_idx + tx_num - batch_put(b'h' + key[:COMP_TXID_LEN] + suffix, hashX) + suffix = tx_num + txout_idx + batch_put(b'h' + suffix, hashX) batch_put(b'u' + hashX + suffix, value_sats) flush_data.adds.clear() @@ -353,7 +355,6 @@ def flush_utxo_db(self, batch, flush_data: FlushData): f'{spend_count:,d} spends in ' f'{elapsed:.1f}s, committing...') - self.utxo_flush_count = self.history.flush_count self.db_height = flush_data.height self.db_tx_count = flush_data.tx_count self.db_tip = flush_data.tip @@ -366,25 +367,38 @@ def flush_state(self, batch): self.last_flush_tx_count = self.fs_tx_count self.write_utxo_state(batch) - def flush_backup(self, flush_data, touched): + def flush_backup(self, flush_data: FlushData, touched_hashxs): '''Like flush_dbs() but when backing up. All UTXOs are flushed.''' assert not flush_data.headers assert not flush_data.block_tx_hashes assert flush_data.height < self.db_height self.history.assert_flushed() + assert len(flush_data.undo_block_tx_hashes) == self.db_height - flush_data.height start_time = time.time() tx_delta = flush_data.tx_count - self.last_flush_tx_count + tx_hashes = [] + for block in flush_data.undo_block_tx_hashes: + tx_hashes += [*util.chunks(block, 32)] + flush_data.undo_block_tx_hashes.clear() + assert len(tx_hashes) == -tx_delta + self.backup_fs(flush_data.height, flush_data.tx_count) - self.history.backup(touched, flush_data.tx_count) + self.history.backup( + hashXs=touched_hashxs, + tx_count=flush_data.tx_count, + tx_hashes=tx_hashes, + spends=flush_data.undo_historical_spends, + ) + flush_data.undo_historical_spends.clear() with self.utxo_db.write_batch() as batch: self.flush_utxo_db(batch, flush_data) # Flush state last as it reads the wall time. self.flush_state(batch) elapsed = self.last_flush - start_time - self.logger.info(f'backup flush #{self.history.flush_count:,d} took ' + self.logger.info(f'backup flush took ' f'{elapsed:.1f}s. Height {flush_data.height:,d} ' f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})') @@ -448,7 +462,7 @@ def read_headers(): return await run_in_thread(read_headers) - def fs_tx_hash(self, tx_num): + def fs_tx_hash(self, tx_num: int) -> Tuple[Optional[bytes], int]: '''Return a pair (tx_hash, tx_height) for the given tx number. If the tx_height is not on disk, returns (None, tx_height).''' @@ -492,26 +506,117 @@ async def fs_block_hashes(self, height, count): return [self.coin.header_hash(header) for header in headers] - async def limited_history(self, hashX, *, limit=1000): - '''Return an unpruned, sorted list of (tx_hash, height) tuples of + async def limited_history_triples( + self, + *, + hashX: bytes, + limit: Optional[int] = 1000, + txnum_min: Optional[int] = None, + txnum_max: Optional[int] = None, + ) -> Sequence[Tuple[bytes, int, int]]: + '''Return an unpruned, sorted list of (tx_hash, height, tx_num) tuples of confirmed transactions that touched the address, earliest in the blockchain first. Includes both spending and receiving transactions. By default returns at most 1000 entries. Set limit to None to get them all. + txnum_min can be used to seek into the history and start there (>=) (instead of genesis). + txnum_max can be used to stop early (<). ''' def read_history(): - tx_nums = list(self.history.get_txnums(hashX, limit)) + tx_nums = list(self.history.get_txnums( + hashX=hashX, limit=limit, txnum_min=txnum_min, txnum_max=txnum_max)) fs_tx_hash = self.fs_tx_hash - return [fs_tx_hash(tx_num) for tx_num in tx_nums] + return [(*fs_tx_hash(tx_num), tx_num) for tx_num in tx_nums] while True: history = await run_in_thread(read_history) - if all(hash is not None for hash, height in history): + if all(tx_hash is not None for tx_hash, height, tx_num in history): return history self.logger.warning(f'limited_history: tx hash ' f'not found (reorg?), retrying...') await sleep(0.25) + async def limited_history( + self, + *, + hashX: bytes, + limit: Optional[int] = 1000, + txnum_min: Optional[int] = None, + txnum_max: Optional[int] = None, + ) -> Sequence[Tuple[bytes, int]]: + '''Return a list of (tx_hash, height) tuples of confirmed txs that touched hashX.''' + triples = await self.limited_history_triples( + hashX=hashX, limit=limit, txnum_min=txnum_min, txnum_max=txnum_max) + return [(tx_hash, height) for (tx_hash, height, tx_num) in triples] + + def fs_txnum_for_txhash(self, tx_hash: bytes) -> Optional[int]: + return self.history.get_txnum_for_txhash(tx_hash) + + async def txnum_for_txhash(self, tx_hash: bytes) -> Optional[int]: + return await run_in_thread(self.fs_txnum_for_txhash, tx_hash) + + async def get_blockheight_and_txpos_for_txhash( + self, tx_hash: bytes, + ) -> Tuple[Optional[int], Optional[int]]: + '''Returns (block_height, tx_pos) for a confirmed tx_hash.''' + tx_num = await self.txnum_for_txhash(tx_hash) + if tx_num is None: + return None, None + return self.get_blockheight_and_txpos_for_txnum(tx_num) + + def get_blockheight_and_txpos_for_txnum( + self, tx_num: int, + ) -> Tuple[Optional[int], Optional[int]]: + '''Returns (block_height, tx_pos) for a tx_num.''' + height = bisect_right(self.tx_counts, tx_num) + if height > self.db_height: + return None, None + assert height > 0 + tx_pos = tx_num - self.tx_counts[height - 1] + return height, tx_pos + + def get_next_tx_num_after_blockheight(self, height: int) -> Optional[int]: + '''For given block height, returns the tx_num of the coinbase tx at height+1. + That is, all txs at height are guaranteed to have tx_num < return value. + ''' + # tx_counts[N] has the cumulative number of txs at the end of + # height N. So tx_counts[0] is 1 - the genesis coinbase + assert height >= 0, f"height must non-negative, not {height}" + if len(self.tx_counts) < height: + return None + return self.tx_counts[height] + + def fs_spender_for_txo(self, prev_txhash: bytes, txout_idx: int) -> 'TXOSpendStatus': + '''For an outpoint, returns its spend-status (considering only the DB, + not the mempool). + ''' + prev_txnum = self.fs_txnum_for_txhash(prev_txhash) + if prev_txnum is None: # outpoint never existed (in chain) + return TXOSpendStatus(prev_height=None) + prev_height = self.get_blockheight_and_txpos_for_txnum(prev_txnum)[0] + hashx, _ = self._get_hashX_for_utxo(prev_txhash, txout_idx) + if hashx: # outpoint exists and is unspent + return TXOSpendStatus(prev_height=prev_height) + # by now we know prev_txhash was mined, and + # txout_idx was either spent or is out-of-bounds + spender_txnum = self.history.get_spender_txnum_for_txo(prev_txnum, txout_idx) + if spender_txnum is None: + # txout_idx was out-of-bounds + return TXOSpendStatus(prev_height=None) + # by now we know the outpoint exists and it was spent. + spender_txhash, spender_height = self.fs_tx_hash(spender_txnum) + if spender_txhash is None: + # not sure if this can happen. maybe through a race? + return TXOSpendStatus(prev_height=prev_height) + return TXOSpendStatus( + prev_height=prev_height, + spender_txhash=spender_txhash, + spender_height=spender_height, + ) + + async def spender_for_txo(self, prev_txhash: bytes, txout_idx: int) -> 'TXOSpendStatus': + return await run_in_thread(self.fs_spender_for_txo, prev_txhash, txout_idx) + # -- Undo information def min_undo_height(self, max_height): @@ -589,13 +694,12 @@ def clear_excess_undo_info(self): # -- UTXO database def read_utxo_state(self): - state = self.utxo_db.get(b'state') + state = self.utxo_db.get(b'\0state') if not state: self.db_height = -1 self.db_tx_count = 0 self.db_tip = b'\0' * 32 self.db_version = max(self.DB_VERSIONS) - self.utxo_flush_count = 0 self.wall_time = 0 self.first_sync = True else: @@ -617,7 +721,6 @@ def read_utxo_state(self): self.db_height = state['height'] self.db_tx_count = state['tx_count'] self.db_tip = state['tip'] - self.utxo_flush_count = state['utxo_flush_count'] self.wall_time = state['wall_time'] self.first_sync = state['first_sync'] @@ -628,7 +731,7 @@ def read_utxo_state(self): # Upgrade DB if self.db_version != max(self.DB_VERSIONS): - self.upgrade_db() + pass # call future upgrade logic here # Log some stats self.logger.info(f'UTXO DB version: {self.db_version:d}') @@ -644,90 +747,6 @@ def read_utxo_state(self): f'sync time so far: {util.formatted_time(self.wall_time)}' ) - def upgrade_db(self): - self.logger.info(f'UTXO DB version: {self.db_version}') - self.logger.info('Upgrading your DB; this can take some time...') - - def upgrade_u_prefix(prefix): - count = 0 - with self.utxo_db.write_batch() as batch: - batch_delete = batch.delete - batch_put = batch.put - # Key: b'u' + address_hashX + tx_idx + tx_num - for db_key, db_value in self.utxo_db.iterator(prefix=prefix): - if len(db_key) == 21: - return - break - if self.db_version == 6: - for db_key, db_value in self.utxo_db.iterator(prefix=prefix): - count += 1 - batch_delete(db_key) - batch_put(db_key[:14] + b'\0\0' + db_key[14:] + b'\0', db_value) - else: - for db_key, db_value in self.utxo_db.iterator(prefix=prefix): - count += 1 - batch_delete(db_key) - batch_put(db_key + b'\0', db_value) - return count - - last = time.monotonic() - count = 0 - for cursor in range(65536): - prefix = b'u' + pack_be_uint16(cursor) - count += upgrade_u_prefix(prefix) - now = time.monotonic() - if now > last + 10: - last = now - self.logger.info(f'DB 1 of 3: {count:,d} entries updated, ' - f'{cursor * 100 / 65536:.1f}% complete') - self.logger.info('DB 1 of 3 upgraded successfully') - - def upgrade_h_prefix(prefix): - count = 0 - with self.utxo_db.write_batch() as batch: - batch_delete = batch.delete - batch_put = batch.put - # Key: b'h' + compressed_tx_hash + tx_idx + tx_num - for db_key, db_value in self.utxo_db.iterator(prefix=prefix): - if len(db_key) == 14: - return - break - if self.db_version == 6: - for db_key, db_value in self.utxo_db.iterator(prefix=prefix): - count += 1 - batch_delete(db_key) - batch_put(db_key[:7] + b'\0\0' + db_key[7:] + b'\0', db_value) - else: - for db_key, db_value in self.utxo_db.iterator(prefix=prefix): - count += 1 - batch_delete(db_key) - batch_put(db_key + b'\0', db_value) - return count - - last = time.monotonic() - count = 0 - for cursor in range(65536): - prefix = b'h' + pack_be_uint16(cursor) - count += upgrade_h_prefix(prefix) - now = time.monotonic() - if now > last + 10: - last = now - self.logger.info(f'DB 2 of 3: {count:,d} entries updated, ' - f'{cursor * 100 / 65536:.1f}% complete') - - # Upgrade tx_counts file - size = (self.db_height + 1) * 8 - tx_counts = self.tx_counts_file.read(0, size) - if len(tx_counts) == (self.db_height + 1) * 4: - tx_counts = array('I', tx_counts) - tx_counts = array('Q', tx_counts) - self.tx_counts_file.write(0, tx_counts.tobytes()) - - self.db_version = max(self.DB_VERSIONS) - with self.utxo_db.write_batch() as batch: - self.write_utxo_state(batch) - self.logger.info('DB 2 of 3 upgraded successfully') - def write_utxo_state(self, batch): '''Write (UTXO) state to the batch.''' state = { @@ -735,30 +754,24 @@ def write_utxo_state(self, batch): 'height': self.db_height, 'tx_count': self.db_tx_count, 'tip': self.db_tip, - 'utxo_flush_count': self.utxo_flush_count, 'wall_time': self.wall_time, 'first_sync': self.first_sync, 'db_version': self.db_version, } - batch.put(b'state', repr(state).encode()) - - def set_flush_count(self, count): - self.utxo_flush_count = count - with self.utxo_db.write_batch() as batch: - self.write_utxo_state(batch) + batch.put(b'\0state', repr(state).encode()) async def all_utxos(self, hashX): '''Return all UTXOs for an address sorted in no particular order.''' def read_utxos(): utxos = [] utxos_append = utxos.append - txnum_padding = bytes(8-TXNUM_LEN) - # Key: b'u' + address_hashX + txout_idx + tx_num + # Key: b'u' + address_hashX + tx_num + txout_idx # Value: the UTXO value as a 64-bit unsigned integer prefix = b'u' + hashX for db_key, db_value in self.utxo_db.iterator(prefix=prefix): - txout_idx, = unpack_le_uint32(db_key[-TXNUM_LEN-4:-TXNUM_LEN]) - tx_num, = unpack_le_uint64(db_key[-TXNUM_LEN:] + txnum_padding) + txout_idx, = unpack_le_uint32(db_key[-TXOUTIDX_LEN:] + TXOUTIDX_PADDING) + tx_numb = db_key[-TXOUTIDX_LEN-TXNUM_LEN:-TXOUTIDX_LEN] + tx_num = unpack_txnum(tx_numb) value, = unpack_le_uint64(db_value) tx_hash, height = self.fs_tx_hash(tx_num) utxos_append(UTXO(tx_num, txout_idx, tx_hash, height, value)) @@ -772,6 +785,23 @@ def read_utxos(): f'found (reorg?), retrying...') await sleep(0.25) + def _get_hashX_for_utxo( + self, tx_hash: bytes, txout_idx: int, + ) -> Tuple[Optional[bytes], Optional[bytes]]: + idx_packed = pack_le_uint32(txout_idx)[:TXOUTIDX_LEN] + tx_num = self.fs_txnum_for_txhash(tx_hash) + if tx_num is None: + return None, None + tx_numb = pack_txnum(tx_num) + + # Key: b'h' + tx_num + txout_idx + # Value: hashX + db_key = b'h' + tx_numb + idx_packed + hashX = self.utxo_db.get(db_key) + if hashX is None: + return None, None + return hashX, tx_numb + idx_packed + async def lookup_utxos(self, prevouts): '''For each prevout, lookup it up in the DB and return a (hashX, value) pair or None if not found. @@ -782,22 +812,7 @@ def lookup_hashXs(): '''Return (hashX, suffix) pairs, or None if not found, for each prevout. ''' - def lookup_hashX(tx_hash, tx_idx): - idx_packed = pack_le_uint32(tx_idx) - txnum_padding = bytes(8-TXNUM_LEN) - - # Key: b'h' + compressed_tx_hash + tx_idx + tx_num - # Value: hashX - prefix = b'h' + tx_hash[:COMP_TXID_LEN] + idx_packed - - # Find which entry, if any, the TX_HASH matches. - for db_key, hashX in self.utxo_db.iterator(prefix=prefix): - tx_num_packed = db_key[-TXNUM_LEN:] - tx_num, = unpack_le_uint64(tx_num_packed + txnum_padding) - hash, _height = self.fs_tx_hash(tx_num) - if hash == tx_hash: - return hashX, idx_packed + tx_num_packed - return None, None + lookup_hashX = self._get_hashX_for_utxo return [lookup_hashX(*prevout) for prevout in prevouts] def lookup_utxos(hashX_pairs): @@ -807,7 +822,7 @@ def lookup_utxo(hashX, suffix): # of us and has mempool txs spending outputs from # that new block return None - # Key: b'u' + address_hashX + tx_idx + tx_num + # Key: b'u' + address_hashX + tx_num + txout_idx # Value: the UTXO value as a 64-bit unsigned integer key = b'u' + hashX + suffix db_value = self.utxo_db.get(key) diff --git a/electrumx/server/env.py b/electrumx/server/env.py index 12735a1e0..c51e76cbb 100644 --- a/electrumx/server/env.py +++ b/electrumx/server/env.py @@ -72,10 +72,10 @@ def __init__(self, coin=None): self.log_level = self.default('LOG_LEVEL', 'info').upper() self.donation_address = self.default('DONATION_ADDRESS', '') self.drop_client = self.custom("DROP_CLIENT", None, re.compile) - self.drop_client_unknown = self.boolean('DROP_CLIENT_UNKNOWN', False) self.blacklist_url = self.default('BLACKLIST_URL', self.coin.BLACKLIST_URL) self.cache_MB = self.integer('CACHE_MB', 1200) self.reorg_limit = self.integer('REORG_LIMIT', self.coin.REORG_LIMIT) + self.daemon_has_txindex = self.boolean('DAEMON_HAS_TXINDEX', True) # Server limits to help prevent DoS diff --git a/electrumx/server/history.py b/electrumx/server/history.py index dc7dd8d54..450524489 100644 --- a/electrumx/server/history.py +++ b/electrumx/server/history.py @@ -9,62 +9,86 @@ '''History by script hash (address).''' import ast -import bisect import time -from array import array from collections import defaultdict -from typing import TYPE_CHECKING, Type, Optional +from typing import TYPE_CHECKING, Type, Optional, Dict, Sequence, Tuple, List +import itertools +from functools import partial + +from aiorpcx import run_in_thread import electrumx.lib.util as util from electrumx.lib.hash import HASHX_LEN, hash_to_hex_str -from electrumx.lib.util import (pack_be_uint16, pack_le_uint64, - unpack_be_uint16_from, unpack_le_uint64) +from electrumx.lib.util import (pack_le_uint64, unpack_le_uint64, + pack_le_uint32, unpack_le_uint32, + pack_be_uint64, unpack_be_uint64) if TYPE_CHECKING: from electrumx.server.storage import Storage TXNUM_LEN = 5 -FLUSHID_LEN = 2 +TXNUM_PADDING = bytes(8 - TXNUM_LEN) +TXOUTIDX_LEN = 3 +TXOUTIDX_PADDING = bytes(4 - TXOUTIDX_LEN) + + +def unpack_txnum(tx_numb: bytes) -> int: + return unpack_be_uint64(TXNUM_PADDING + tx_numb)[0] + + +def pack_txnum(tx_num: int) -> bytes: + return pack_be_uint64(tx_num)[-TXNUM_LEN:] class History: - DB_VERSIONS = (0, 1) + DB_VERSIONS = (3, ) + STORE_INTERMEDIATE_STATUSHASH_EVERY_N_TXS = 5000 db: Optional['Storage'] def __init__(self): self.logger = util.class_logger(__name__, self.__class__.__name__) - # For history compaction - self.max_hist_row_entries = 12500 - self.unflushed = defaultdict(bytearray) - self.unflushed_count = 0 - self.flush_count = 0 - self.comp_flush_count = -1 - self.comp_cursor = -1 + self.hist_db_tx_count = 0 + self.hist_db_tx_count_next = 0 # after next flush, next value for self.hist_db_tx_count self.db_version = max(self.DB_VERSIONS) self.upgrade_cursor = -1 - # Key: address_hashX + flush_id - # Value: sorted "list" of tx_nums in history of hashX + self._unflushed_hashxs = defaultdict(bytearray) # type: Dict[bytes, bytearray] + self._unflushed_hashxs_count = 0 + self._unflushed_txhash_to_txnum_map = {} # type: Dict[bytes, int] + self._unflushed_txo_to_spender = {} # type: Dict[bytes, int] # (tx_num+txout_idx)->tx_num + # hashX -> list of (tx_num, status): + self._unflushed_hashx_to_statushash = {} # type: Dict[bytes, List[Tuple[int, bytes]]] + self._unflushed_statushash_count = 0 + + # Key: b'H' + address_hashX + tx_num + # Value: + # --- + # Key: b't' + tx_hash + # Value: tx_num + # --- + # Key: b's' + tx_num + txout_idx + # Value: tx_num + # "which tx spent this TXO?" -- note that UTXOs are not stored. + # --- + # Key: b'S' + address_hashX + tx_num + # Value: status_hash + # Status hash of hashX including only txs up to tx_num. + # An append-only cache of partial statuses: only reorg-safe depths are stored. self.db = None def open_db( self, + *, db_class: Type['Storage'], for_sync: bool, - utxo_flush_count: int, - compacting: bool, - ): + utxo_db_tx_count: int, + ) -> None: self.db = db_class('hist', for_sync) self.read_state() - self.clear_excess(utxo_flush_count) - # An incomplete compaction needs to be cancelled otherwise - # restarting it will corrupt the history - if not compacting: - self._cancel_compaction() - return self.flush_count + self.clear_excess(utxo_db_tx_count) def close_db(self): if self.db: @@ -72,22 +96,15 @@ def close_db(self): self.db = None def read_state(self): - state = self.db.get(b'state\0\0') + state = self.db.get(b'\0state') if state: state = ast.literal_eval(state.decode()) if not isinstance(state, dict): raise RuntimeError('failed reading state from history DB') - self.flush_count = state['flush_count'] - self.comp_flush_count = state.get('comp_flush_count', -1) - self.comp_cursor = state.get('comp_cursor', -1) self.db_version = state.get('db_version', 0) self.upgrade_cursor = state.get('upgrade_cursor', -1) - else: - self.flush_count = 0 - self.comp_flush_count = -1 - self.comp_cursor = -1 - self.db_version = max(self.DB_VERSIONS) - self.upgrade_cursor = -1 + self.hist_db_tx_count = state.get('hist_db_tx_count', 0) + self.hist_db_tx_count_next = self.hist_db_tx_count if self.db_version not in self.DB_VERSIONS: msg = (f'your history DB version is {self.db_version} but ' @@ -95,30 +112,53 @@ def read_state(self): self.logger.error(msg) raise RuntimeError(msg) if self.db_version != max(self.DB_VERSIONS): - self.upgrade_db() + pass # call future upgrade logic here self.logger.info(f'history DB version: {self.db_version}') - self.logger.info(f'flush count: {self.flush_count:,d}') - def clear_excess(self, utxo_flush_count): - # < might happen at end of compaction as both DBs cannot be - # updated atomically - if self.flush_count <= utxo_flush_count: + def clear_excess(self, utxo_db_tx_count: int) -> None: + # self.hist_db_tx_count != utxo_db_tx_count might happen as + # both DBs cannot be updated atomically + # FIXME when advancing blocks, hist_db is flushed first, so its count can be higher; + # but when backing up (e.g. reorg), hist_db is flushed first as well, + # so its count can be lower?! + # Shouldn't we flush utxo_db first when backing up? + if self.hist_db_tx_count <= utxo_db_tx_count: + assert self.hist_db_tx_count == utxo_db_tx_count return self.logger.info('DB shut down uncleanly. Scanning for ' 'excess history flushes...') - keys = [] - for key, _hist in self.db.iterator(prefix=b''): - flush_id, = unpack_be_uint16_from(key[-FLUSHID_LEN:]) - if flush_id > utxo_flush_count: - keys.append(key) - - self.logger.info(f'deleting {len(keys):,d} history entries') - - self.flush_count = utxo_flush_count + hkeys = [] + for db_key, db_val in self.db.iterator(prefix=b'H'): + tx_numb = db_key[-TXNUM_LEN:] + tx_num = unpack_txnum(tx_numb) + if tx_num >= utxo_db_tx_count: + hkeys.append(db_key) + + tkeys = [] + for db_key, db_val in self.db.iterator(prefix=b't'): + tx_numb = db_val + tx_num = unpack_txnum(tx_numb) + if tx_num >= utxo_db_tx_count: + tkeys.append(db_key) + + skeys = [] + for db_key, db_val in self.db.iterator(prefix=b's'): + tx_numb1 = db_key[1:1+TXNUM_LEN] + tx_numb2 = db_val + tx_num1 = unpack_txnum(tx_numb1) + tx_num2 = unpack_txnum(tx_numb2) + if max(tx_num1, tx_num2) >= utxo_db_tx_count: + skeys.append(db_key) + + self.logger.info(f'deleting {len(hkeys):,d} addr entries,' + f' {len(tkeys):,d} txs, and {len(skeys):,d} spends') + + self.hist_db_tx_count = utxo_db_tx_count + self.hist_db_tx_count_next = self.hist_db_tx_count with self.db.write_batch() as batch: - for key in keys: + for key in itertools.chain(hkeys, tkeys, skeys): batch.delete(key) self.write_state(batch) @@ -127,278 +167,259 @@ def clear_excess(self, utxo_flush_count): def write_state(self, batch): '''Write state to the history DB.''' state = { - 'flush_count': self.flush_count, - 'comp_flush_count': self.comp_flush_count, - 'comp_cursor': self.comp_cursor, + 'hist_db_tx_count': self.hist_db_tx_count, 'db_version': self.db_version, 'upgrade_cursor': self.upgrade_cursor, } - # History entries are not prefixed; the suffix \0\0 ensures we - # look similar to other entries and aren't interfered with - batch.put(b'state\0\0', repr(state).encode()) + batch.put(b'\0state', repr(state).encode()) - def add_unflushed(self, hashXs_by_tx, first_tx_num): - unflushed = self.unflushed + def add_unflushed( + self, + *, + hashXs_by_tx: Sequence[Sequence[bytes]], + first_tx_num: int, + txhash_to_txnum_map: Dict[bytes, int], + txo_to_spender_map: Dict[Tuple[bytes, int], bytes], # (tx_hash, txout_idx) -> tx_hash + ): + unflushed_hashxs = self._unflushed_hashxs count = 0 + tx_num = None for tx_num, hashXs in enumerate(hashXs_by_tx, start=first_tx_num): - tx_numb = pack_le_uint64(tx_num)[:TXNUM_LEN] + tx_numb = pack_txnum(tx_num) hashXs = set(hashXs) for hashX in hashXs: - unflushed[hashX] += tx_numb + unflushed_hashxs[hashX] += tx_numb count += len(hashXs) - self.unflushed_count += count + self._unflushed_hashxs_count += count + if tx_num is not None: + assert self.hist_db_tx_count_next + len(hashXs_by_tx) == tx_num + 1 + self.hist_db_tx_count_next = tx_num + 1 + + self._unflushed_txhash_to_txnum_map.update(txhash_to_txnum_map) + + unflushed_spenders = self._unflushed_txo_to_spender + get_txnum_for_txhash = self.get_txnum_for_txhash + for (prev_hash, prev_idx), spender_hash in txo_to_spender_map.items(): + prev_txnum = get_txnum_for_txhash(prev_hash) + assert prev_txnum is not None + spender_txnum = get_txnum_for_txhash(spender_hash) + assert spender_txnum is not None + prev_idx_packed = pack_le_uint32(prev_idx)[:TXOUTIDX_LEN] + prev_txnumb = pack_txnum(prev_txnum) + unflushed_spenders[prev_txnumb+prev_idx_packed] = spender_txnum def unflushed_memsize(self): - return len(self.unflushed) * 180 + self.unflushed_count * TXNUM_LEN + # note: the magic numbers here were estimated using util.deep_getsizeof + hashXs = len(self._unflushed_hashxs) * 180 + self._unflushed_hashxs_count * TXNUM_LEN + txs = 232 + 93 * len(self._unflushed_txhash_to_txnum_map) + spenders = 102 + 113 * len(self._unflushed_txo_to_spender) + statushashes = (232 + 100 * len(self._unflushed_hashx_to_statushash) + + 161 * self._unflushed_statushash_count) + return hashXs + txs + spenders + statushashes def assert_flushed(self): - assert not self.unflushed + assert not self._unflushed_hashxs + assert not self._unflushed_txhash_to_txnum_map + assert not self._unflushed_txo_to_spender def flush(self): start_time = time.monotonic() - self.flush_count += 1 - flush_id = pack_be_uint16(self.flush_count) - unflushed = self.unflushed + unflushed_hashxs = self._unflushed_hashxs + chunks = util.chunks with self.db.write_batch() as batch: - for hashX in sorted(unflushed): - key = hashX + flush_id - batch.put(key, bytes(unflushed[hashX])) + for hashX in sorted(unflushed_hashxs): + for tx_num in sorted(chunks(unflushed_hashxs[hashX], TXNUM_LEN)): + db_key = b'H' + hashX + tx_num + batch.put(db_key, b'') + for tx_hash, tx_num in sorted(self._unflushed_txhash_to_txnum_map.items()): + db_key = b't' + tx_hash + tx_numb = pack_txnum(tx_num) + batch.put(db_key, tx_numb) + for prevout, spender_txnum in sorted(self._unflushed_txo_to_spender.items()): + db_key = b's' + prevout + db_val = pack_txnum(spender_txnum) + batch.put(db_key, db_val) + for hashX, lst in sorted(self._unflushed_hashx_to_statushash.items()): + for tx_num, status in lst: + db_key = b'S' + hashX + pack_txnum(tx_num) + batch.put(db_key, status) + self.hist_db_tx_count = self.hist_db_tx_count_next self.write_state(batch) - count = len(unflushed) - unflushed.clear() - self.unflushed_count = 0 + addr_count = len(unflushed_hashxs) + tx_count = len(self._unflushed_txhash_to_txnum_map) + spend_count = len(self._unflushed_txo_to_spender) + statushash_count = self._unflushed_statushash_count + unflushed_hashxs.clear() + self._unflushed_hashxs_count = 0 + self._unflushed_txhash_to_txnum_map.clear() + self._unflushed_txo_to_spender.clear() + self._unflushed_statushash_count = 0 + self._unflushed_hashx_to_statushash.clear() if self.db.for_sync: elapsed = time.monotonic() - start_time - self.logger.info(f'flushed history in {elapsed:.1f}s ' - f'for {count:,d} addrs') - - def backup(self, hashXs, tx_count): - # Not certain this is needed, but it doesn't hurt - self.flush_count += 1 - nremoves = 0 - bisect_left = bisect.bisect_left - chunks = util.chunks - - txnum_padding = bytes(8-TXNUM_LEN) + self.logger.info(f'flushed history in {elapsed:.1f}s, for: ' + f'{addr_count:,d} addrs, {tx_count:,d} txs, {spend_count:,d} spends, ' + f'{statushash_count:,d} statushashes') + + def backup(self, *, hashXs, tx_count, tx_hashes: Sequence[bytes], spends: Sequence[bytes]): + self.assert_flushed() + get_txnum_for_txhash = self.get_txnum_for_txhash + nremoves_addr = 0 with self.db.write_batch() as batch: for hashX in sorted(hashXs): deletes = [] - puts = {} - for key, hist in self.db.iterator(prefix=hashX, reverse=True): - a = array( - 'Q', - b''.join(item + txnum_padding for item in chunks(hist, TXNUM_LEN)) - ) - # Remove all history entries >= tx_count - idx = bisect_left(a, tx_count) - nremoves += len(a) - idx - if idx > 0: - puts[key] = hist[:TXNUM_LEN * idx] + prefix = b'H' + hashX + for db_key, db_val in self.db.iterator(prefix=prefix, reverse=True): + tx_numb = db_key[-TXNUM_LEN:] + tx_num = unpack_txnum(tx_numb) + if tx_num >= tx_count: + nremoves_addr += 1 + deletes.append(db_key) + else: + # note: we can break now, due to 'reverse=True' and txnums being big endian break - deletes.append(key) - for key in deletes: batch.delete(key) - for key, value in puts.items(): - batch.put(key, value) + for spend in spends: + prev_hash = spend[:32] + prev_idx = spend[32:] + assert len(prev_idx) == TXOUTIDX_LEN + prev_txnum = get_txnum_for_txhash(prev_hash) + assert prev_txnum is not None + prev_txnumb = pack_txnum(prev_txnum) + db_key = b's' + prev_txnumb + prev_idx + batch.delete(db_key) + for tx_hash in sorted(tx_hashes): + db_key = b't' + tx_hash + batch.delete(db_key) + self.hist_db_tx_count = tx_count + self.hist_db_tx_count_next = self.hist_db_tx_count self.write_state(batch) - self.logger.info(f'backing up removed {nremoves:,d} history entries') + self.logger.info(f'backing up history, removed {nremoves_addr:,d} addrs, ' + f'{len(tx_hashes):,d} txs, and {len(spends):,d} spends') - def get_txnums(self, hashX, limit=1000): + def get_txnums( + self, + *, + hashX: bytes, + limit: Optional[int] = 1000, + txnum_min: Optional[int] = None, + txnum_max: Optional[int] = None, + ): '''Generator that returns an unpruned, sorted list of tx_nums in the history of a hashX. Includes both spending and receiving transactions. By default yields at most 1000 entries. Set - limit to None to get them all. ''' + limit to None to get them all. + txnum_min can be used to seek into the history and start there (>=) (instead of genesis). + txnum_max can be used to stop early (<). + ''' limit = util.resolve_limit(limit) - chunks = util.chunks - txnum_padding = bytes(8-TXNUM_LEN) - for _key, hist in self.db.iterator(prefix=hashX): - for tx_numb in chunks(hist, TXNUM_LEN): - if limit == 0: - return - tx_num, = unpack_le_uint64(tx_numb + txnum_padding) - yield tx_num - limit -= 1 - - # - # History compaction - # - - # comp_cursor is a cursor into compaction progress. - # -1: no compaction in progress - # 0-65535: Compaction in progress; all prefixes < comp_cursor have - # been compacted, and later ones have not. - # 65536: compaction complete in-memory but not flushed - # - # comp_flush_count applies during compaction, and is a flush count - # for history with prefix < comp_cursor. flush_count applies - # to still uncompacted history. It is -1 when no compaction is - # taking place. Key suffixes up to and including comp_flush_count - # are used, so a parallel history flush must first increment this - # - # When compaction is complete and the final flush takes place, - # flush_count is reset to comp_flush_count, and comp_flush_count to -1 - - def _flush_compaction(self, cursor, write_items, keys_to_delete): - '''Flush a single compaction pass as a batch.''' - # Update compaction state - if cursor == 65536: - self.flush_count = self.comp_flush_count - self.comp_cursor = -1 - self.comp_flush_count = -1 + prefix = b'H' + hashX + it = self.db.iterator(prefix=prefix) + if txnum_min is not None: + it.seek(prefix + pack_txnum(txnum_min)) + txnum_min = txnum_min if txnum_min is not None else 0 + txnum_max = txnum_max if txnum_max is not None else float('inf') + assert txnum_min <= txnum_max, f"txnum_min={txnum_min}, txnum_max={txnum_max}" + for db_key, db_val in it: + tx_numb = db_key[-TXNUM_LEN:] + if limit == 0: + return + tx_num = unpack_txnum(tx_numb) + if tx_num >= txnum_max: + return + assert txnum_min <= tx_num < txnum_max, (f"txnum_min={txnum_min}, tx_num={tx_num}, " + f"txnum_max={txnum_max}") + yield tx_num + limit -= 1 + + def get_txnum_for_txhash(self, tx_hash: bytes) -> Optional[int]: + tx_num = self._unflushed_txhash_to_txnum_map.get(tx_hash) + if tx_num is None: + db_key = b't' + tx_hash + tx_numb = self.db.get(db_key) + if tx_numb: + tx_num = unpack_txnum(tx_numb) + return tx_num + + def get_spender_txnum_for_txo(self, prev_txnum: int, txout_idx: int) -> Optional[int]: + '''For an outpoint, returns the tx_num that spent it. + If the outpoint is unspent, or even if it never existed (!), returns None. + ''' + prev_idx_packed = pack_le_uint32(txout_idx)[:TXOUTIDX_LEN] + prev_txnumb = pack_txnum(prev_txnum) + prevout = prev_txnumb + prev_idx_packed + spender_txnum = self._unflushed_txhash_to_txnum_map.get(prevout) + if spender_txnum is None: + db_key = b's' + prevout + spender_txnumb = self.db.get(db_key) + if spender_txnumb: + spender_txnum = unpack_txnum(spender_txnumb) + return spender_txnum + + def fs_get_intermediate_statushash_for_hashx( + self, + *, + hashX: bytes, + txnum_max: int = None, + ) -> Tuple[int, bytes]: + '''For a hashX, returns (tx_num, status), with the latest stored statushash + and corresponding tx_num, where tx_num < txnum_max. + This can be used to efficiently calculate the status of a hashX as + only the txs mined after(>) tx_num will need to be hashed. + ''' + # first, search in-memory, among the unflushed statuses + unflushed_statushashes = self._unflushed_hashx_to_statushash.get(hashX, []) + if len(unflushed_statushashes) > 0: + for tx_num, status in reversed(unflushed_statushashes): + if txnum_max is None or tx_num < txnum_max: + return tx_num, status + # second, search in the on-disk DB + prefix = b'S' + hashX + it = self.db.iterator(prefix=prefix, reverse=True) + if txnum_max is not None: + it.seek(prefix + pack_txnum(txnum_max)) + for db_key, db_val in it: + tx_numb = db_key[-TXNUM_LEN:] + tx_num = unpack_txnum(tx_numb) + status = db_val + break else: - self.comp_cursor = cursor + tx_num = 0 + status = bytes(32) + return tx_num, status - # History DB. Flush compacted history and updated state - with self.db.write_batch() as batch: - # Important: delete first! The keyspace may overlap. - for key in keys_to_delete: - batch.delete(key) - for key, value in write_items: - batch.put(key, value) - self.write_state(batch) - - def _compact_hashX(self, hashX, hist_map, hist_list, - write_items, keys_to_delete): - '''Compres history for a hashX. hist_list is an ordered list of - the histories to be compressed.''' - # History entries (tx numbers) are TXNUM_LEN bytes each. Distribute - # over rows of up to 50KB in size. A fixed row size means - # future compactions will not need to update the first N - 1 - # rows. - max_row_size = self.max_hist_row_entries * TXNUM_LEN - full_hist = b''.join(hist_list) - nrows = (len(full_hist) + max_row_size - 1) // max_row_size - if nrows > 4: - self.logger.info( - f'hashX {hash_to_hex_str(hashX)} is large: ' - f'{len(full_hist) // TXNUM_LEN:,d} entries across {nrows:,d} rows' - ) - - # Find what history needs to be written, and what keys need to - # be deleted. Start by assuming all keys are to be deleted, - # and then remove those that are the same on-disk as when - # compacted. - write_size = 0 - keys_to_delete.update(hist_map) - for n, chunk in enumerate(util.chunks(full_hist, max_row_size)): - key = hashX + pack_be_uint16(n) - if hist_map.get(key) == chunk: - keys_to_delete.remove(key) - else: - write_items.append((key, chunk)) - write_size += len(chunk) - - assert n + 1 == nrows - self.comp_flush_count = max(self.comp_flush_count, n) - - return write_size - - def _compact_prefix(self, prefix, write_items, keys_to_delete): - '''Compact all history entries for hashXs beginning with the - given prefix. Update keys_to_delete and write.''' - prior_hashX = None - hist_map = {} - hist_list = [] - - key_len = HASHX_LEN + FLUSHID_LEN - write_size = 0 - for key, hist in self.db.iterator(prefix=prefix): - # Ignore non-history entries - if len(key) != key_len: - continue - hashX = key[:-FLUSHID_LEN] - if hashX != prior_hashX and prior_hashX: - write_size += self._compact_hashX(prior_hashX, hist_map, - hist_list, write_items, - keys_to_delete) - hist_map.clear() - hist_list.clear() - prior_hashX = hashX - hist_map[key] = hist - hist_list.append(hist) - - if prior_hashX: - write_size += self._compact_hashX(prior_hashX, hist_map, hist_list, - write_items, keys_to_delete) - return write_size - - def _compact_history(self, limit): - '''Inner loop of history compaction. Loops until limit bytes have - been processed. + async def get_intermediate_statushash_for_hashx( + self, + *, + hashX: bytes, + txnum_max: int = None, + ) -> Tuple[int, bytes]: + f = partial(self.fs_get_intermediate_statushash_for_hashx, hashX=hashX, txnum_max=txnum_max) + return await run_in_thread(f) + + def store_intermediate_statushash_for_hashx( + self, + *, + hashX: bytes, + tx_num: int, + status: bytes, + ) -> None: + '''For a hashX, store a partial status calculated up to (and including) tx_num. + tx_num must be at a reorg-safe depth. + The status is only stored in memory at first; it will be written to the DB + during the next flush(). ''' - keys_to_delete = set() - write_items = [] # A list of (key, value) pairs - write_size = 0 - - # Loop over 2-byte prefixes - cursor = self.comp_cursor - while write_size < limit and cursor < 65536: - prefix = pack_be_uint16(cursor) - write_size += self._compact_prefix(prefix, write_items, - keys_to_delete) - cursor += 1 - - max_rows = self.comp_flush_count + 1 - self._flush_compaction(cursor, write_items, keys_to_delete) - - self.logger.info( - f'history compaction: wrote {len(write_items):,d} rows ' - f'({write_size / 1000000:.1f} MB), removed ' - f'{len(keys_to_delete):,d} rows, largest: {max_rows:,d}, ' - f'{100 * cursor / 65536:.1f}% complete' - ) - return write_size - - def _cancel_compaction(self): - if self.comp_cursor != -1: - self.logger.warning('cancelling in-progress history compaction') - self.comp_flush_count = -1 - self.comp_cursor = -1 - - # - # DB upgrade - # - - def upgrade_db(self): - self.logger.info(f'history DB version: {self.db_version}') - self.logger.info('Upgrading your history DB; this can take some time...') - - def upgrade_cursor(cursor): - count = 0 - prefix = pack_be_uint16(cursor) - key_len = HASHX_LEN + 2 - chunks = util.chunks - with self.db.write_batch() as batch: - batch_put = batch.put - for key, hist in self.db.iterator(prefix=prefix): - # Ignore non-history entries - if len(key) != key_len: - continue - count += 1 - hist = b''.join(item + b'\0' for item in chunks(hist, 4)) - batch_put(key, hist) - self.upgrade_cursor = cursor - self.write_state(batch) - return count - - last = time.monotonic() - count = 0 - - for cursor in range(self.upgrade_cursor + 1, 65536): - count += upgrade_cursor(cursor) - now = time.monotonic() - if now > last + 10: - last = now - self.logger.info(f'DB 3 of 3: {count:,d} entries updated, ' - f'{cursor * 100 / 65536:.1f}% complete') - - self.db_version = max(self.DB_VERSIONS) - self.upgrade_cursor = -1 - with self.db.write_batch() as batch: - self.write_state(batch) - self.logger.info('DB 3 of 3 upgraded successfully') + if hashX not in self._unflushed_hashx_to_statushash: + self._unflushed_hashx_to_statushash[hashX] = [] + # maintain invariant that unflushed statuses are in order (increasing tx_num): + if len(self._unflushed_hashx_to_statushash[hashX]) > 0: + tx_num_last, status_last = self._unflushed_hashx_to_statushash[hashX][-1] + if tx_num <= tx_num_last: + return + self._unflushed_hashx_to_statushash[hashX].append((tx_num, status)) + self._unflushed_statushash_count += 1 diff --git a/electrumx/server/mempool.py b/electrumx/server/mempool.py index 2a7f58933..3fddc644e 100644 --- a/electrumx/server/mempool.py +++ b/electrumx/server/mempool.py @@ -12,7 +12,7 @@ from abc import ABC, abstractmethod from asyncio import Lock from collections import defaultdict -from typing import Sequence, Tuple, TYPE_CHECKING, Type, Dict +from typing import Sequence, Tuple, TYPE_CHECKING, Type, Dict, Optional, Set import math import attr @@ -20,6 +20,7 @@ from electrumx.lib.hash import hash_to_hex_str, hex_str_to_hash from electrumx.lib.util import class_logger, chunks +from electrumx.lib.tx import TXOSpendStatus from electrumx.server.db import UTXO if TYPE_CHECKING: @@ -28,19 +29,18 @@ @attr.s(slots=True) class MemPoolTx: - prevouts = attr.ib() # type: Sequence[Tuple[bytes, int]] - # A pair is a (hashX, value) tuple - in_pairs = attr.ib() - out_pairs = attr.ib() - fee = attr.ib() - size = attr.ib() + prevouts = attr.ib() # type: Sequence[Tuple[bytes, int]] # (txid, txout_idx) + in_pairs = attr.ib() # type: Optional[Sequence[Tuple[bytes, int]]] # (hashX, value_in_sats) + out_pairs = attr.ib() # type: Sequence[Tuple[bytes, int]] # (hashX, value_in_sats) + fee = attr.ib() # type: int # in sats + size = attr.ib() # type: int # in vbytes @attr.s(slots=True) class MemPoolTxSummary: - hash = attr.ib() - fee = attr.ib() - has_unconfirmed_inputs = attr.ib() + hash = attr.ib() # type: bytes + fee = attr.ib() # type: int # in sats + has_unconfirmed_inputs = attr.ib() # type: bool class DBSyncError(Exception): @@ -52,17 +52,17 @@ class MemPoolAPI(ABC): and used by it to query DB and blockchain state.''' @abstractmethod - async def height(self): + async def height(self) -> int: '''Query bitcoind for its height.''' @abstractmethod - def cached_height(self): + def cached_height(self) -> Optional[int]: '''Return the height of bitcoind the last time it was queried, for any reason, without actually querying it. ''' @abstractmethod - def db_height(self): + def db_height(self) -> int: '''Return the height flushed to the on-disk DB.''' @abstractmethod @@ -79,17 +79,25 @@ async def raw_transactions(self, hex_hashes): @abstractmethod async def lookup_utxos(self, prevouts): - '''Return a list of (hashX, value) pairs each prevout if unspent, - otherwise return None if spent or not found. + '''Return a list of (hashX, value) pairs, one for each prevout if unspent, + otherwise return None if spent or not found (for the given prevout). - prevouts - an iterable of (hash, index) pairs + prevouts - an iterable of (tx_hash, txout_idx) pairs ''' @abstractmethod - async def on_mempool(self, touched, height): - '''Called each time the mempool is synchronized. touched is a set of - hashXs touched since the previous call. height is the - daemon's height at the time the mempool was obtained.''' + async def on_mempool( + self, + *, + touched_hashxs: Set[bytes], + touched_outpoints: Set[Tuple[bytes, int]], + height: int, + ): + '''Called each time the mempool is synchronized. touched_hashxs and + touched_outpoints are sets of hashXs and tx outpoints touched since + the previous call. height is the daemon's height at the time the + mempool was obtained. + ''' class MemPool: @@ -111,8 +119,9 @@ def __init__(self, coin: Type['Coin'], api: MemPoolAPI, refresh_secs=5.0, log_st self.coin = coin self.api = api self.logger = class_logger(__name__, self.__class__.__name__) - self.txs = {} - self.hashXs = defaultdict(set) # None can be a key + self.txs = {} # type: Dict[bytes, MemPoolTx] # txid->tx + self.hashXs = defaultdict(set) # type: Dict[Optional[bytes], Set[bytes]] # hashX->txids + self.txo_to_spender = {} # type: Dict[Tuple[bytes, int], bytes] # prevout->txid self.cached_compact_histogram = [] self.refresh_secs = refresh_secs self.log_status_secs = log_status_secs @@ -129,8 +138,9 @@ async def _logging(self, synchronized_event): self.logger.info(f'synced in {elapsed:.2f}s') while True: mempool_size = sum(tx.size for tx in self.txs.values()) / 1_000_000 - self.logger.info(f'{len(self.txs):,d} txs {mempool_size:.2f} MB ' - f'touching {len(self.hashXs):,d} addresses') + self.logger.info(f'{len(self.txs):,d} txs {mempool_size:.2f} MB, ' + f'touching {len(self.hashXs):,d} addresses. ' + f'{len(self.txo_to_spender):,d} spends.') await sleep(self.log_status_secs) await synchronized_event.wait() @@ -195,7 +205,15 @@ def _compress_histogram( prev_fee_rate = fee_rate return compact - def _accept_transactions(self, tx_map, utxo_map, touched): + def _accept_transactions( + self, + *, + tx_map: Dict[bytes, MemPoolTx], # txid->tx + utxo_map: Dict[Tuple[bytes, int], Tuple[bytes, int]], # prevout->(hashX,value_in_sats) + touched_hashxs: Set[bytes], # set of hashXs + touched_outpoints: Set[Tuple[bytes, int]], # set of outpoints + ) -> Tuple[Dict[bytes, MemPoolTx], + Dict[Tuple[bytes, int], Tuple[bytes, int]]]: '''Accept transactions in tx_map to the mempool if all their inputs can be found in the existing mempool or a utxo_map from the DB. @@ -204,11 +222,12 @@ def _accept_transactions(self, tx_map, utxo_map, touched): ''' hashXs = self.hashXs txs = self.txs + txo_to_spender = self.txo_to_spender deferred = {} unspent = set(utxo_map) # Try to find all prevouts so we can accept the TX - for hash, tx in tx_map.items(): + for tx_hash, tx in tx_map.items(): in_pairs = [] try: for prevout in tx.prevouts: @@ -219,7 +238,7 @@ def _accept_transactions(self, tx_map, utxo_map, touched): utxo = txs[prev_hash].out_pairs[prev_index] in_pairs.append(utxo) except KeyError: - deferred[hash] = tx + deferred[tx_hash] = tx continue # Spend the prevouts @@ -231,19 +250,25 @@ def _accept_transactions(self, tx_map, utxo_map, touched): # because some in_parts would be missing tx.fee = max(0, (sum(v for _, v in tx.in_pairs) - sum(v for _, v in tx.out_pairs))) - txs[hash] = tx + txs[tx_hash] = tx for hashX, _value in itertools.chain(tx.in_pairs, tx.out_pairs): - touched.add(hashX) - hashXs[hashX].add(hash) + touched_hashxs.add(hashX) + hashXs[hashX].add(tx_hash) + for prevout in tx.prevouts: + txo_to_spender[prevout] = tx_hash + touched_outpoints.add(prevout) + for out_idx, out_pair in enumerate(tx.out_pairs): + touched_outpoints.add((tx_hash, out_idx)) return deferred, {prevout: utxo_map[prevout] for prevout in unspent} async def _refresh_hashes(self, synchronized_event): '''Refresh our view of the daemon's mempool.''' - # Touched accumulates between calls to on_mempool and each + # touched_* accumulates between calls to on_mempool and each # call transfers ownership - touched = set() + touched_hashxs = set() + touched_outpoints = set() while True: height = self.api.cached_height() hex_hashes = await self.api.mempool_hashes() @@ -252,7 +277,12 @@ async def _refresh_hashes(self, synchronized_event): hashes = {hex_str_to_hash(hh) for hh in hex_hashes} try: async with self.lock: - await self._process_mempool(hashes, touched, height) + await self._process_mempool( + all_hashes=hashes, + touched_hashxs=touched_hashxs, + touched_outpoints=touched_outpoints, + mempool_height=height, + ) except DBSyncError: # The UTXO DB is not at the same height as the # mempool; wait and try again @@ -260,14 +290,27 @@ async def _refresh_hashes(self, synchronized_event): else: synchronized_event.set() synchronized_event.clear() - await self.api.on_mempool(touched, height) - touched = set() + await self.api.on_mempool( + touched_hashxs=touched_hashxs, + touched_outpoints=touched_outpoints, + height=height, + ) + touched_hashxs = set() + touched_outpoints = set() await sleep(self.refresh_secs) - async def _process_mempool(self, all_hashes, touched, mempool_height): + async def _process_mempool( + self, + *, + all_hashes: Set[bytes], # set of txids + touched_hashxs: Set[bytes], # set of hashXs + touched_outpoints: Set[Tuple[bytes, int]], # set of outpoints + mempool_height: int, + ) -> None: # Re-sync with the new set of hashes txs = self.txs hashXs = self.hashXs + txo_to_spender = self.txo_to_spender if mempool_height != self.api.db_height(): raise DBSyncError @@ -275,20 +318,32 @@ async def _process_mempool(self, all_hashes, touched, mempool_height): # First handle txs that have disappeared for tx_hash in (set(txs) - all_hashes): tx = txs.pop(tx_hash) + # hashXs tx_hashXs = {hashX for hashX, value in tx.in_pairs} tx_hashXs.update(hashX for hashX, value in tx.out_pairs) for hashX in tx_hashXs: hashXs[hashX].remove(tx_hash) if not hashXs[hashX]: del hashXs[hashX] - touched |= tx_hashXs + touched_hashxs |= tx_hashXs + # outpoints + for prevout in tx.prevouts: + del txo_to_spender[prevout] + touched_outpoints.add(prevout) + for out_idx, out_pair in enumerate(tx.out_pairs): + touched_outpoints.add((tx_hash, out_idx)) # Process new transactions new_hashes = list(all_hashes.difference(txs)) if new_hashes: group = TaskGroup() for hashes in chunks(new_hashes, 200): - coro = self._fetch_and_accept(hashes, all_hashes, touched) + coro = self._fetch_and_accept( + hashes=hashes, + all_hashes=all_hashes, + touched_hashxs=touched_hashxs, + touched_outpoints=touched_outpoints, + ) await group.spawn(coro) if mempool_height != self.api.db_height(): raise DBSyncError @@ -304,14 +359,23 @@ async def _process_mempool(self, all_hashes, touched, mempool_height): # FIXME: this is not particularly efficient while tx_map and len(tx_map) != prior_count: prior_count = len(tx_map) - tx_map, utxo_map = self._accept_transactions(tx_map, utxo_map, - touched) + tx_map, utxo_map = self._accept_transactions( + tx_map=tx_map, + utxo_map=utxo_map, + touched_hashxs=touched_hashxs, + touched_outpoints=touched_outpoints, + ) if tx_map: self.logger.error(f'{len(tx_map)} txs dropped') - return touched - - async def _fetch_and_accept(self, hashes, all_hashes, touched): + async def _fetch_and_accept( + self, + *, + hashes: Set[bytes], # set of txids + all_hashes: Set[bytes], # set of txids + touched_hashxs: Set[bytes], # set of hashXs + touched_outpoints: Set[Tuple[bytes, int]], # set of outpoints + ): '''Fetch a list of mempool transactions.''' hex_hashes_iter = (hash_to_hex_str(hash) for hash in hashes) raw_txs = await self.api.raw_transactions(hex_hashes_iter) @@ -334,8 +398,13 @@ def deserialize_txs(): # This function is pure if not txin.is_generation()) txout_pairs = tuple((to_hashX(txout.pk_script), txout.value) for txout in tx.outputs) - txs[hash] = MemPoolTx(txin_pairs, None, txout_pairs, - 0, tx_size) + txs[hash] = MemPoolTx( + prevouts=txin_pairs, + in_pairs=None, + out_pairs=txout_pairs, + fee=0, + size=tx_size, + ) return txs # Thread this potentially slow operation so as not to block @@ -352,7 +421,12 @@ def deserialize_txs(): # This function is pure utxos = await self.api.lookup_utxos(prevouts) utxo_map = {prevout: utxo for prevout, utxo in zip(prevouts, utxos)} - return self._accept_transactions(tx_map, utxo_map, touched) + return self._accept_transactions( + tx_map=tx_map, + utxo_map=utxo_map, + touched_hashxs=touched_hashxs, + touched_outpoints=touched_outpoints, + ) # # External interface @@ -396,12 +470,15 @@ async def potential_spends(self, hashX): return result async def transaction_summaries(self, hashX): - '''Return a list of MemPoolTxSummary objects for the hashX.''' + '''Return a list of MemPoolTxSummary objects for the hashX, + sorted as expected by protocol methods. + ''' result = [] for tx_hash in self.hashXs.get(hashX, ()): tx = self.txs[tx_hash] has_ui = any(hash in self.txs for hash, idx in tx.prevouts) result.append(MemPoolTxSummary(tx_hash, tx.fee, has_ui)) + result.sort(key=lambda x: (x.has_unconfirmed_inputs, x.hash)) return result async def unordered_UTXOs(self, hashX): @@ -418,3 +495,36 @@ async def unordered_UTXOs(self, hashX): if hX == hashX: utxos.append(UTXO(-1, pos, tx_hash, 0, value)) return utxos + + async def spender_for_txo(self, prev_txhash: bytes, txout_idx: int) -> 'TXOSpendStatus': + '''For an outpoint, returns its spend-status. + This only considers the mempool, not the DB/blockchain, so e.g. mined + txs are not distinguished from txs that never existed. + ''' + # look up funding tx + prev_tx = self.txs.get(prev_txhash, None) + if prev_tx is None: + # funding tx already mined or never existed + prev_height = None + else: + if len(prev_tx.out_pairs) <= txout_idx: + # output idx out of bounds...? + return TXOSpendStatus(prev_height=None) + prev_has_ui = any(hash in self.txs for hash, idx in prev_tx.prevouts) + prev_height = -prev_has_ui + prevout = (prev_txhash, txout_idx) + # look up spending tx + spender_txhash = self.txo_to_spender.get(prevout, None) + spender_tx = self.txs.get(spender_txhash, None) + if spender_tx is None: + self.logger.warning(f"spender_tx {hash_to_hex_str(spender_txhash)} not in" + f"mempool, but txo_to_spender referenced it as spender " + f"of {hash_to_hex_str(prev_txhash)}:{txout_idx} ?!") + return TXOSpendStatus(prev_height=prev_height) + spender_has_ui = any(hash in self.txs for hash, idx in spender_tx.prevouts) + spender_height = -spender_has_ui + return TXOSpendStatus( + prev_height=prev_height, + spender_txhash=spender_txhash, + spender_height=spender_height, + ) diff --git a/electrumx/server/session.py b/electrumx/server/session.py index e5cb17039..29972893c 100644 --- a/electrumx/server/session.py +++ b/electrumx/server/session.py @@ -18,7 +18,8 @@ from collections import defaultdict from functools import partial from ipaddress import IPv4Address, IPv6Address, IPv4Network, IPv6Network -from typing import Optional, TYPE_CHECKING +from typing import (Optional, TYPE_CHECKING, Tuple, Sequence, Set, Dict, Iterable, Any, Mapping, + List) import asyncio import attr @@ -72,6 +73,17 @@ def non_negative_integer(value): f'{value} should be a non-negative integer') +def integer(value): + '''Return param value it is or can be converted to an + integer, otherwise raise an RPCError.''' + try: + return int(value) + except (ValueError, TypeError): + pass + raise RPCError(BAD_REQUEST, + f'{value} should be a non-negative integer') + + def assert_boolean(value): '''Return param value it is boolean otherwise raise an RPCError.''' if value in (False, True): @@ -93,11 +105,51 @@ def assert_tx_hash(value): raise RPCError(BAD_REQUEST, f'{value} should be a transaction hash') +def assert_status_hash(value): + '''Raise an RPCError if the value is not a valid hexadecimal scripthash status. + + If it is valid, return it as 32-byte binary hash. + ''' + # note that unlike for tx_hash, we keep the endianness here + try: + raw_hash = util.hex_to_bytes(value) + if len(raw_hash) == 32: + return raw_hash + except (ValueError, TypeError): + pass + raise RPCError(BAD_REQUEST, f'{value} should be a scripthash status') + + +def is_hex_str(text: Any) -> bool: + if not isinstance(text, str): + return False + try: + b = bytes.fromhex(text) + except Exception: + return False + # forbid whitespaces in text: + if len(text) != 2 * len(b): + return False + return True + + +def assert_hex_str(value: Any) -> None: + if not is_hex_str(value): + raise RPCError(BAD_REQUEST, f'{value} should be a hex str') + + +# Constants used to limit the size of returned history to ensure it fits within a response. +# These are all overestimating somewhat, for paranoia. +HISTORY_OVER_WIRE_OVERHEAD_BYTES = 200 # a few bytes are reserved for overhead +HISTORY_CONF_ITEM_SIZE_BYTES = 107 # a conf tx item is ~this many bytes when JSON-encoded +HISTORY_UNCONF_ITEM_SIZE_BYTES = 140 # a mempool tx item is ~this many bytes when JSON-encoded + + @attr.s(slots=True) class SessionGroup: name = attr.ib() weight = attr.ib() - sessions = attr.ib() + sessions = attr.ib() # type: Set[ElectrumX] retained_cost = attr.ib() def session_cost(self): @@ -138,16 +190,13 @@ def __init__( self.shutdown_event = shutdown_event self.logger = util.class_logger(__name__, self.__class__.__name__) self.servers = {} # service->server - self.sessions = {} # session->iterable of its SessionGroups - self.session_groups = {} # group name->SessionGroup instance + self.sessions = {} # type: Dict[ElectrumX, Iterable[SessionGroup]] + self.session_groups = {} # type: Dict[str, SessionGroup] self.txs_sent = 0 # Would use monotonic time, but aiorpcx sessions use Unix time: self.start_time = time.time() self._method_counts = defaultdict(int) self._reorg_count = 0 - self._history_cache = pylru.lrucache(1000) - self._history_lookups = 0 - self._history_hits = 0 self._tx_hashes_cache = pylru.lrucache(1000) self._tx_hashes_lookups = 0 self._tx_hashes_hits = 0 @@ -304,7 +353,7 @@ async def _recalc_concurrency(self): # cost_decay_per_sec. for session in self.sessions: # Subs have an on-going cost so decay more slowly with more subs - session.cost_decay_per_sec = hard_limit / (10000 + 5 * session.sub_count()) + session.cost_decay_per_sec = hard_limit / (10000 + 5 * session.sub_count_total()) session.recalc_concurrency() def _get_info(self): @@ -316,10 +365,7 @@ def _get_info(self): 'daemon': self.daemon.logged_url(), 'daemon height': self.daemon.cached_height(), 'db height': self.db.db_height, - 'db_flush_count': self.db.history.flush_count, 'groups': len(self.session_groups), - 'history cache': cache_fmt.format( - self._history_lookups, self._history_hits, len(self._history_cache)), 'merkle cache': cache_fmt.format( self._merkle_lookups, self._merkle_hits, len(self._merkle_cache)), 'pid': os.getpid(), @@ -328,11 +374,14 @@ def _get_info(self): 'request total': sum(self._method_counts.values()), 'sessions': { 'count': len(sessions), - 'count with subs': sum(len(getattr(s, 'hashX_subs', ())) > 0 for s in sessions), + 'count with subs_sh': sum(s.sub_count_scripthashes() > 0 for s in sessions), + 'count with subs_txo': sum(s.sub_count_txoutpoints() > 0 for s in sessions), + 'count with subs_any': sum(s.sub_count_total() > 0 for s in sessions), 'errors': sum(s.errors for s in sessions), 'logged': len([s for s in sessions if s.log_me]), 'pending requests': sum(s.unanswered_request_count() for s in sessions), - 'subs': sum(s.sub_count() for s in sessions), + 'subs_sh': sum(s.sub_count_scripthashes() for s in sessions), + 'subs_txo': sum(s.sub_count_txoutpoints() for s in sessions), }, 'tx hashes cache': cache_fmt.format( self._tx_hashes_lookups, self._tx_hashes_hits, len(self._tx_hashes_cache)), @@ -354,7 +403,7 @@ def _session_data(self, for_log): session.extra_cost(), session.unanswered_request_count(), session.txs_sent, - session.sub_count(), + session.sub_count_total(), session.recv_count, session.recv_size, session.send_count, session.send_size, now - session.start_time) @@ -371,7 +420,7 @@ def _group_data(self): group.retained_cost, sum(s.unanswered_request_count() for s in sessions), sum(s.txs_sent for s in sessions), - sum(s.sub_count() for s in sessions), + sum(s.sub_count_total() for s in sessions), sum(s.recv_count for s in sessions), sum(s.recv_size for s in sessions), sum(s.send_count for s in sessions), @@ -554,7 +603,7 @@ def arg_to_hashX(arg): if not hashX: continue n = None - history = await db.limited_history(hashX, limit=limit) + history = await db.limited_history(hashX=hashX, limit=limit) for n, (tx_hash, height) in enumerate(history): lines.append(f'History #{n:,d}: height {height:,d} ' f'tx_hash {hash_to_hex_str(tx_hash)}') @@ -679,16 +728,32 @@ async def tx_hashes_func(start, count): branch = [hash_to_hex_str(hash) for hash in branch] return branch, cost / 2500 - async def merkle_branch_for_tx_hash(self, height, tx_hash): - '''Return a triple (branch, tx_pos, cost).''' + async def merkle_branch_for_tx_hash( + self, *, tx_hash: bytes, height: int = None, + ) -> Tuple[int, Sequence[str], int, float]: + '''Returns (height, branch, tx_pos, cost).''' + cost = 0 + tx_pos = None + if height is None: + cost += 0.1 + height, tx_pos = await self.db.get_blockheight_and_txpos_for_txhash(tx_hash) + if height is None: + raise RPCError(BAD_REQUEST, + f'tx {hash_to_hex_str(tx_hash)} not in any block') tx_hashes, tx_hashes_cost = await self.tx_hashes_at_blockheight(height) - try: - tx_pos = tx_hashes.index(tx_hash) - except ValueError: + if tx_pos is None: + try: + tx_pos = tx_hashes.index(tx_hash) + except ValueError: + raise RPCError(BAD_REQUEST, + f'tx {hash_to_hex_str(tx_hash)} not in block at height {height:,d}') + elif not (len(tx_hashes) > tx_pos and tx_hashes[tx_pos] == tx_hash): + # there was a reorg while processing the request... TODO maybe retry? raise RPCError(BAD_REQUEST, - f'tx {hash_to_hex_str(tx_hash)} not in block at height {height:,d}') + f'tx {hash_to_hex_str(tx_hash)} was reorged while processing request') branch, merkle_cost = await self._merkle_branch(height, tx_hashes, tx_pos) - return branch, tx_pos, tx_hashes_cost + merkle_cost + cost += tx_hashes_cost + merkle_cost + return height, branch, tx_pos, cost async def merkle_branch_for_tx_pos(self, height, tx_pos): '''Return a triple (branch, tx_hash_hex, cost).''' @@ -751,41 +816,25 @@ async def broadcast_transaction(self, raw_tx): self.txs_sent += 1 return hex_hash - async def limited_history(self, hashX): - '''Returns a pair (history, cost). - - History is a sorted list of (tx_hash, height) tuples, or an RPCError.''' - # History DoS limit. Each element of history is about 99 bytes when encoded - # as JSON. - limit = self.env.max_send // 99 - cost = 0.1 - self._history_lookups += 1 - try: - result = self._history_cache[hashX] - self._history_hits += 1 - except KeyError: - result = await self.db.limited_history(hashX, limit=limit) - cost += 0.1 + len(result) * 0.001 - if len(result) >= limit: - result = RPCError(BAD_REQUEST, f'history too large', cost=cost) - self._history_cache[hashX] = result - - if isinstance(result, Exception): - raise result - return result, cost - - async def _notify_sessions(self, height, touched): + async def _notify_sessions( + self, + *, + touched_hashxs: Set[bytes], + touched_outpoints: Set[Tuple[bytes, int]], + height: int, + ): '''Notify sessions about height changes and touched addresses.''' height_changed = height != self.notified_height if height_changed: await self._refresh_hsub_results(height) - # Invalidate our history cache for touched hashXs - cache = self._history_cache - for hashX in set(cache).intersection(touched): - del cache[hashX] for session in self.sessions: - await self._task_group.spawn(session.notify, touched, height_changed) + coro = session.notify( + touched_hashxs=touched_hashxs, + touched_outpoints=touched_outpoints, + height_changed=height_changed, + ) + await self._task_group.spawn(coro) def _ip_addr_group_name(self, session) -> Optional[str]: host = session.remote_address().host @@ -861,6 +910,8 @@ def __init__( self.env = session_mgr.env self.coin = self.env.coin self.client = 'unknown' + self.sv_seen = False # has seen 'server.version' message? + self.sv_negotiated = asyncio.Event() # done negotiating protocol version self.anon_logs = self.env.anon_logs self.txs_sent = 0 self.log_me = SessionBase.log_new @@ -875,7 +926,13 @@ def __init__( self.session_mgr.add_session(self) self.recalc_concurrency() # must be called after session_mgr.add_session - async def notify(self, touched, height_changed): + async def notify( + self, + *, + touched_hashxs: Set[bytes], + touched_outpoints: Set[Tuple[bytes, int]], + height_changed: bool, + ): pass def default_framer(self): @@ -911,9 +968,15 @@ async def connection_lost(self): msg = 'disconnected' + msg self.logger.info(msg) - def sub_count(self): + def sub_count_scripthashes(self): return 0 + def sub_count_txoutpoints(self): + return 0 + + def sub_count_total(self): + return self.sub_count_scripthashes() + self.sub_count_txoutpoints() + async def handle_request(self, request): '''Handle an incoming request. ElectrumX doesn't receive notifications from client sessions. @@ -924,31 +987,49 @@ async def handle_request(self, request): handler = None method = 'invalid method' if handler is None else request.method - # If DROP_CLIENT_UNKNOWN is enabled, check if the client identified - # by calling server.version previously. If not, disconnect the session - if self.env.drop_client_unknown and method != 'server.version' and self.client == 'unknown': - self.logger.info(f'disconnecting because client is unknown') - raise ReplyAndDisconnect( - BAD_REQUEST, f'use server.version to identify client') + # Version negotiation must happen before any other messages. + if not self.sv_seen and method != 'server.version': + self.logger.info(f'closing session: server.version must be first msg. got: {method}') + await self._do_crash_old_electrum_client() + raise ReplyAndDisconnect(BAD_REQUEST, f'use server.version to identify client') + # Wait for version negotiation to finish before processing other messages. + if method != 'server.version' and not self.sv_negotiated.is_set(): + await self.sv_negotiated.wait() self.session_mgr._method_counts[method] += 1 coro = handler_invocation(handler, request)() return await coro + async def maybe_crash_old_client(self, ptuple, crash_client_ver): + if crash_client_ver: + client_ver = util.protocol_tuple(self.client) + is_old_protocol = ptuple is None or ptuple <= (1, 2) + is_old_client = client_ver != (0,) and client_ver <= crash_client_ver + if is_old_protocol and is_old_client: + await self._do_crash_old_electrum_client() + + async def _do_crash_old_electrum_client(self): + self.logger.info(f'attempting to crash old client with version {self.client}') + # this can crash electrum client 2.6 <= v < 3.1.2 + await self.send_notification('blockchain.relayfee', ()) + # this can crash electrum client (v < 2.8.2) UNION (3.0.0 <= v < 3.3.0) + await self.send_notification('blockchain.estimatefee', ()) + class ElectrumX(SessionBase): '''A TCP server that handles incoming Electrum connections.''' PROTOCOL_MIN = (1, 4) - PROTOCOL_MAX = (1, 4, 2) + PROTOCOL_MAX = (1, 5) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.subscribe_headers = False self.connection.max_response_size = self.env.max_send - self.hashX_subs = {} - self.sv_seen = False - self.mempool_statuses = {} + self.hashX_subs = {} # type: Dict[bytes, bytes] # hashX -> scripthash + self.txoutpoint_subs = set() # type: Set[Tuple[bytes, int]] + self.mempool_hashX_statuses = {} # type: Dict[bytes, str] + self.mempool_txoutpoint_statuses = {} # type: Dict[Tuple[bytes, int], Mapping[str, Any]] self.set_request_handlers(self.PROTOCOL_MIN) self.is_peer = False self.cost = 5.0 # Connection cost @@ -1000,35 +1081,47 @@ def on_disconnect_due_to_excessive_session_cost(self): group_names = [group.name for group in groups] self.logger.info(f"closing session over res usage. ip: {ip_addr}. groups: {group_names}") - def sub_count(self): + def sub_count_scripthashes(self): return len(self.hashX_subs) + def sub_count_txoutpoints(self): + return len(self.txoutpoint_subs) + def unsubscribe_hashX(self, hashX): - self.mempool_statuses.pop(hashX, None) + self.mempool_hashX_statuses.pop(hashX, None) return self.hashX_subs.pop(hashX, None) - async def notify(self, touched, height_changed): + async def notify( + self, + *, + touched_hashxs: Set[bytes], + touched_outpoints: Set[Tuple[bytes, int]], + height_changed: bool, + ): '''Notify the client about changes to touched addresses (from mempool updates or new blocks) and height. ''' + # block headers if height_changed and self.subscribe_headers: args = (await self.subscribe_headers_result(), ) await self.send_notification('blockchain.headers.subscribe', args) - touched = touched.intersection(self.hashX_subs) - if touched or (height_changed and self.mempool_statuses): + # hashXs + num_hashx_notifs_sent = 0 + touched_hashxs = touched_hashxs.intersection(self.hashX_subs) + if touched_hashxs or (height_changed and self.mempool_hashX_statuses): changed = {} - for hashX in touched: + for hashX in touched_hashxs: alias = self.hashX_subs.get(hashX) if alias: status = await self.subscription_address_status(hashX) changed[alias] = status # Check mempool hashXs - the status is a function of the confirmed state of - # other transactions. - mempool_statuses = self.mempool_statuses.copy() - for hashX, old_status in mempool_statuses.items(): + # other transactions. (this is to detect if height changed from -1 to 0) + mempool_hashX_statuses = self.mempool_hashX_statuses.copy() + for hashX, old_status in mempool_hashX_statuses.items(): alias = self.hashX_subs.get(hashX) if alias: status = await self.subscription_address_status(hashX) @@ -1038,10 +1131,36 @@ async def notify(self, touched, height_changed): method = 'blockchain.scripthash.subscribe' for alias, status in changed.items(): await self.send_notification(method, (alias, status)) - - if changed: - es = '' if len(changed) == 1 else 'es' - self.logger.info(f'notified of {len(changed):,d} address{es}') + num_hashx_notifs_sent = len(changed) + + # tx outpoints + num_txo_notifs_sent = 0 + touched_outpoints = touched_outpoints.intersection(self.txoutpoint_subs) + if touched_outpoints or (height_changed and self.mempool_txoutpoint_statuses): + method = 'blockchain.outpoint.subscribe' + txo_to_status = {} + for prevout in touched_outpoints: + txo_to_status[prevout] = await self.txoutpoint_status(*prevout) + + # Check mempool TXOs - the status is a function of the confirmed state of + # other transactions. (this is to detect if height changed from -1 to 0) + mempool_txoutpoint_statuses = self.mempool_txoutpoint_statuses.copy() + for prevout, old_status in mempool_txoutpoint_statuses.items(): + status = await self.txoutpoint_status(*prevout) + if status != old_status: + txo_to_status[prevout] = status + + for tx_hash, txout_idx in touched_outpoints: + spend_status = txo_to_status[(tx_hash, txout_idx)] + tx_hash_hex = hash_to_hex_str(tx_hash) + await self.send_notification(method, ((tx_hash_hex, txout_idx), spend_status)) + num_txo_notifs_sent = len(touched_outpoints) + + if num_hashx_notifs_sent + num_txo_notifs_sent > 0: + es1 = '' if num_hashx_notifs_sent == 1 else 'es' + s2 = '' if num_txo_notifs_sent == 1 else 's' + self.logger.info(f'notified of {num_hashx_notifs_sent:,d} address{es1} and ' + f'{num_txo_notifs_sent:,d} outpoint{s2}') async def subscribe_headers_result(self): '''The result of a header subscription or notification.''' @@ -1064,14 +1183,21 @@ async def peers_subscribe(self): self.bump_cost(1.0) return self.peer_mgr.on_peers_subscribe(self.is_tor()) - async def address_status(self, hashX): - '''Returns an address status. + async def address_status(self, hashX: bytes) -> Optional[str]: + '''Returns an address status, as hex str (or None).''' + if self.protocol_tuple < (1, 5): + return await self._address_status_proto_legacy(hashX) + else: + return await self._address_status_proto_1_5(hashX) + + async def _address_status_proto_legacy(self, hashX: bytes) -> Optional[str]: + '''Returns an address status, as per protocol older than <1.5. Status is a hex string, but must be None if there is no history. ''' - # Note history is ordered and mempool unordered in electrum-server + # Note both confirmed history and mempool history are ordered # For mempool, height is -1 if it has unconfirmed inputs, otherwise 0 - db_history, cost = await self.session_mgr.limited_history(hashX) + db_history = await self.limited_history(hashX) mempool = await self.mempool.transaction_summaries(hashX) status = ''.join(f'{hash_to_hex_str(tx_hash)}:' @@ -1082,7 +1208,7 @@ async def address_status(self, hashX): for tx in mempool) # Add status hashing cost - self.bump_cost(cost + 0.1 + len(status) * 0.00002) + self.bump_cost(0.1 + len(status) * 0.00002) if status: status = sha256(status.encode()).hex() @@ -1090,9 +1216,64 @@ async def address_status(self, hashX): status = None if mempool: - self.mempool_statuses[hashX] = status + self.mempool_hashX_statuses[hashX] = status else: - self.mempool_statuses.pop(hashX, None) + self.mempool_hashX_statuses.pop(hashX, None) + + return status + + async def _calc_intermediate_status_for_hashX( + self, + *, + hashX: bytes, + txnum_max: int = None, + ) -> bytes: + '''Returns the status of a hashX, considering only confirmed history + up to (<) txnum_max. + TODO maybe also store intermediate status hashes as part of initial sync? to prep cache... + ''' + storestatus_period = self.db.history.STORE_INTERMEDIATE_STATUSHASH_EVERY_N_TXS + reorgsafe_height = self.db.db_height - self.env.reorg_limit + # get partial status from cache + tx_num, status = await self.db.history.get_intermediate_statushash_for_hashx( + hashX=hashX, txnum_max=txnum_max) + while True: + # get a history part from db, update status to incorporate it, and maybe store status + db_history_part = await self.db.limited_history_triples( + hashX=hashX, limit=storestatus_period, txnum_min=tx_num+1, txnum_max=txnum_max) + self.bump_cost(0.3 + len(db_history_part) * 0.001) # cost of history-lookup + self.bump_cost(36 * len(db_history_part) * 0.00002) # cost of hashing mined txs + for (tx_hash, height, tx_num) in db_history_part: + tx_item = tx_hash + util.pack_le_int32(height) + status = sha256(status + tx_item) + if height < reorgsafe_height: + self.db.history.store_intermediate_statushash_for_hashx( + hashX=hashX, tx_num=tx_num, status=status) + # if db_history_part is not max-sized, then there are no more parts. + # (note: even if max-sized, the next part might be empty) + if len(db_history_part) < storestatus_period: + return status + self.logger.info(f"calculated intermediate status for hashX={hashX.hex()}, " + f"up to tx_num={tx_num}") + + async def _address_status_proto_1_5(self, hashX: bytes) -> str: + '''Returns an address status, as per protocol newer than >=1.5''' + # first, consider confirmed history + status = await self._calc_intermediate_status_for_hashX(hashX=hashX) + + # second, consider mempool txs + mempool = await self.mempool.transaction_summaries(hashX) + self.bump_cost(44 * len(mempool) * 0.00002) # cost of hashing mempool txs + for tx in mempool: + height = -tx.has_unconfirmed_inputs + tx_item = tx.hash + util.pack_le_int32(height) + util.pack_le_uint64(tx.fee) + status = sha256(status + tx_item) + status = status.hex() + + if mempool: + self.mempool_hashX_statuses[hashX] = status + else: + self.mempool_hashX_statuses.pop(hashX, None) return status @@ -1105,6 +1286,40 @@ async def subscription_address_status(self, hashX): self.unsubscribe_hashX(hashX) return None + async def txoutpoint_status(self, prev_txhash: bytes, txout_idx: int) -> Dict[str, Any]: + self.bump_cost(0.2) + spend_status = await self.db.spender_for_txo(prev_txhash, txout_idx) + if spend_status.spender_height is not None: + # TXO was created, was mined, was spent, and spend was mined. + assert spend_status.prev_height > 0 + assert spend_status.spender_height > 0 + assert spend_status.spender_txhash is not None + else: + mp_spend_status = await self.mempool.spender_for_txo(prev_txhash, txout_idx) + if mp_spend_status.prev_height is not None: + spend_status.prev_height = mp_spend_status.prev_height + if mp_spend_status.spender_height is not None: + spend_status.spender_height = mp_spend_status.spender_height + if mp_spend_status.spender_txhash is not None: + spend_status.spender_txhash = mp_spend_status.spender_txhash + # convert to json dict the client expects + status = {} + if spend_status.prev_height is not None: + status['height'] = spend_status.prev_height + if spend_status.spender_txhash is not None: + assert spend_status.spender_height is not None + status['spender_txhash'] = hash_to_hex_str(spend_status.spender_txhash) + status['spender_height'] = spend_status.spender_height + + prevout = (prev_txhash, txout_idx) + if ((spend_status.prev_height is not None and spend_status.prev_height <= 0) + or (spend_status.spender_height is not None and spend_status.spender_height <= 0)): + self.mempool_txoutpoint_statuses[prevout] = status + else: + self.mempool_txoutpoint_statuses.pop(prevout, None) + + return status + async def hashX_listunspent(self, hashX): '''Return the list of UTXOs of a script hash, including mempool effects.''' @@ -1138,8 +1353,20 @@ async def scripthash_get_balance(self, scripthash): hashX = scripthash_to_hashX(scripthash) return await self.get_balance(hashX) - async def unconfirmed_history(self, hashX): - # Note unconfirmed history is unordered in electrum-server + async def limited_history(self, hashX): + '''Returns a sorted list of (tx_hash, height) tuples. + Raises RPCError if history would not fit within a response. + ''' + limit_bytes = self.env.max_send - HISTORY_OVER_WIRE_OVERHEAD_BYTES + limit_nconf = limit_bytes // HISTORY_CONF_ITEM_SIZE_BYTES + result = await self.db.limited_history(hashX=hashX, limit=limit_nconf) + self.bump_cost(0.2 + len(result) * 0.001) + if len(result) >= limit_nconf: + raise RPCError(BAD_REQUEST, f'history too large') + return result + + async def unconfirmed_history(self, hashX) -> List[Dict[str, Any]]: + # Note both confirmed history and mempool history are ordered # height is -1 if it has unconfirmed inputs, otherwise 0 result = [{'tx_hash': hash_to_hex_str(tx.hash), 'height': -tx.has_unconfirmed_inputs, @@ -1148,18 +1375,110 @@ async def unconfirmed_history(self, hashX): self.bump_cost(0.25 + len(result) / 50) return result - async def confirmed_and_unconfirmed_history(self, hashX): - # Note history is ordered but unconfirmed is unordered in e-s - history, cost = await self.session_mgr.limited_history(hashX) - self.bump_cost(cost) + async def scripthash_get_history_proto_legacy(self, scripthash): + '''Return the confirmed and unconfirmed history of a scripthash, + as per protocol older than <1.5. + ''' + hashX = scripthash_to_hashX(scripthash) + history = await self.limited_history(hashX) conf = [{'tx_hash': hash_to_hex_str(tx_hash), 'height': height} for tx_hash, height in history] return conf + await self.unconfirmed_history(hashX) - async def scripthash_get_history(self, scripthash): - '''Return the confirmed and unconfirmed history of a scripthash.''' + async def scripthash_get_history_proto_1_5( + self, + scripthash, + from_height=0, + to_height=-1, + client_statushash=None, + client_height=None + ): + '''Return the confirmed and unconfirmed history of a scripthash, + as per protocol newer than >=1.5. + ''' hashX = scripthash_to_hashX(scripthash) - return await self.confirmed_and_unconfirmed_history(hashX) + from_height = non_negative_integer(from_height) + to_height = integer(to_height) + if not (-1 <= to_height): + raise RPCError(BAD_REQUEST, f'{to_height} should be an integer >= -1') + to_height_or_inf = to_height if to_height >= 0 else float('inf') + if not (from_height <= to_height_or_inf): + raise RPCError(BAD_REQUEST, f'from_height={from_height} ' + f'<= to_height={to_height} must hold.') + if (client_statushash is None) != (client_height is None): + raise RPCError(BAD_REQUEST, f'either both or neither of client_statushash and ' + f'client_height must be present') + if client_statushash is not None: + client_statushash = assert_status_hash(client_statushash) + client_height = non_negative_integer(client_height) + if not (from_height <= client_height < to_height_or_inf): + raise RPCError(BAD_REQUEST, f'from_height={from_height} ' + f'<= client_height={client_height} ' + f'< to_height={to_height} must hold.') + # Done sanitising args; start handling + # Check if client status is consistent with server; if so we can fast-forward from_height + if client_statushash is not None: + client_txnum = self.db.get_next_tx_num_after_blockheight(client_height) + server_statushash = await self._calc_intermediate_status_for_hashX( + hashX=hashX, + txnum_max=client_txnum + 1, + ) + if server_statushash == client_statushash: + from_height = client_height + 1 + + # Limit size of returned history to ensure it fits within a response. + # TODO add a min() here so that this won't become "consensus". + # or maybe sessions should negotiate max msg size... + limit_bytes = self.env.max_send - HISTORY_OVER_WIRE_OVERHEAD_BYTES + limit_nconf = limit_bytes // HISTORY_CONF_ITEM_SIZE_BYTES + if from_height == 0: + txnum_min = 0 + else: + txnum_min = self.db.get_next_tx_num_after_blockheight(from_height - 1) + if to_height == -1: + txnum_max = None + else: + txnum_max = self.db.get_next_tx_num_after_blockheight(to_height - 1) + if txnum_min is not None: + db_history = await self.db.limited_history( + hashX=hashX, + limit=limit_nconf, + txnum_min=txnum_min, + txnum_max=txnum_max, + ) + else: + db_history = [] + self.bump_cost(0.2 + len(db_history) * 0.001) + + if len(db_history) >= limit_nconf: + # History might have gotten truncated. + # Note that the truncation might have happened mid-block; + # hence we need to exclude txs in the last block. + _, height_last = db_history[-1] + _, height_first = db_history[0] + assert height_first < height_last, "history cannot even fit one block of txs" + db_history = [(tx_hash, height) for (tx_hash, height) in db_history + if height != height_last] + to_height = height_last + hist_conf = [{'tx_hash': hash_to_hex_str(tx_hash), 'height': height} + for tx_hash, height in db_history] + ret_history = hist_conf + if to_height == -1: + # If conf history is long, mempool hist might not fit within response. + # We either include all mempool txs, or none of them. + limit_nunconf = ((limit_bytes - len(hist_conf) * HISTORY_CONF_ITEM_SIZE_BYTES) + // HISTORY_UNCONF_ITEM_SIZE_BYTES) + hist_unconf = await self.unconfirmed_history(hashX) + if len(hist_unconf) <= limit_nunconf: + ret_history += hist_unconf + else: + to_height = max(self.db.db_height + 1, from_height) + assert (to_height == -1) or (from_height <= to_height) + return { + 'from_height': from_height, + 'to_height': to_height, + 'history': ret_history, + } async def scripthash_get_mempool(self, scripthash): '''Return the mempool transactions touching a scripthash.''' @@ -1184,6 +1503,31 @@ async def scripthash_unsubscribe(self, scripthash): hashX = scripthash_to_hashX(scripthash) return self.unsubscribe_hashX(hashX) is not None + async def txoutpoint_subscribe(self, tx_hash, txout_idx, spk_hint=None): + '''Subscribe to an outpoint. + + spk_hint: scriptPubKey corresponding to the outpoint. Might be used by + other servers, but we don't need and hence ignore it. + ''' + tx_hash = assert_tx_hash(tx_hash) + txout_idx = non_negative_integer(txout_idx) + if spk_hint is not None: + assert_hex_str(spk_hint) + spend_status = await self.txoutpoint_status(tx_hash, txout_idx) + self.txoutpoint_subs.add((tx_hash, txout_idx)) + return spend_status + + async def txoutpoint_unsubscribe(self, tx_hash, txout_idx): + '''Unsubscribe from an outpoint.''' + tx_hash = assert_tx_hash(tx_hash) + txout_idx = non_negative_integer(txout_idx) + self.bump_cost(0.1) + prevout = (tx_hash, txout_idx) + was_subscribed = prevout in self.txoutpoint_subs + self.txoutpoint_subs.discard(prevout) + self.mempool_txoutpoint_statuses.pop(prevout, None) + return was_subscribed + async def _merkle_proof(self, cp_height, height): max_height = self.db.db_height if not height <= cp_height <= max_height: @@ -1218,6 +1562,8 @@ async def block_headers(self, start_height, count, cp_height=0): start_height and count must be non-negative integers. At most MAX_CHUNK_SIZE headers will be returned. ''' + if self.protocol_tuple >= (1, 5): + return await self.block_headers_array(start_height, count, cp_height) start_height = non_negative_integer(start_height) count = non_negative_integer(count) cp_height = non_negative_integer(cp_height) @@ -1234,6 +1580,38 @@ async def block_headers(self, start_height, count, cp_height=0): self.bump_cost(cost) return result + async def block_headers_array(self, start_height, count, cp_height=0): + '''Return block headers in an array for the main chain; + starting at start_height. + start_height and count must be non-negative integers. At most + MAX_CHUNK_SIZE headers will be returned. + ''' + start_height = non_negative_integer(start_height) + count = non_negative_integer(count) + cp_height = non_negative_integer(cp_height) + cost = count / 50 + + max_size = self.MAX_CHUNK_SIZE + count = min(count, max_size) + headers, count = await self.db.read_headers(start_height, count) + result = {'count': count, 'max': max_size, 'headers': []} + if count and cp_height: + cost += 1.0 + last_height = start_height + count - 1 + result.update(await self._merkle_proof(cp_height, last_height)) + + cursor = 0 + height = 0 + while cursor < len(headers): + next_cursor = self.db.header_offset(height + 1) + header = headers[cursor:next_cursor] + result['headers'].append(header.hex()) + cursor = next_cursor + height += 1 + + self.bump_cost(cost) + return result + def is_tor(self): '''Try to detect if the connection is to a tor hidden service we are running.''' @@ -1363,7 +1741,7 @@ async def server_version(self, client_name='', protocol_version=None): ptuple, client_min = util.protocol_version( protocol_version, self.PROTOCOL_MIN, self.PROTOCOL_MAX) - await self.crash_old_client(ptuple, self.env.coin.CRASH_CLIENT_VER) + await self.maybe_crash_old_client(ptuple, self.env.coin.CRASH_CLIENT_VER) if ptuple is None: if client_min > self.PROTOCOL_MIN: @@ -1374,20 +1752,9 @@ async def server_version(self, client_name='', protocol_version=None): BAD_REQUEST, f'unsupported protocol version: {protocol_version}')) self.set_request_handlers(ptuple) + self.sv_negotiated.set() return electrumx.version, self.protocol_version_string() - async def crash_old_client(self, ptuple, crash_client_ver): - if crash_client_ver: - client_ver = util.protocol_tuple(self.client) - is_old_protocol = ptuple is None or ptuple <= (1, 2) - is_old_client = client_ver != (0,) and client_ver <= crash_client_ver - if is_old_protocol and is_old_client: - self.logger.info(f'attempting to crash old client with version {self.client}') - # this can crash electrum client 2.6 <= v < 3.1.2 - await self.send_notification('blockchain.relayfee', ()) - # this can crash electrum client (v < 2.8.2) UNION (3.0.0 <= v < 3.3.0) - await self.send_notification('blockchain.estimatefee', ()) - async def transaction_broadcast(self, raw_tx): '''Broadcast a raw transaction to the network. @@ -1421,14 +1788,24 @@ async def transaction_get(self, tx_hash, verbose=False): tx_hash: the transaction hash as a hexadecimal string verbose: passed on to the daemon ''' - assert_tx_hash(tx_hash) + tx_hash_bytes = assert_tx_hash(tx_hash) + tx_hash_hex = tx_hash + del tx_hash if verbose not in (True, False): raise RPCError(BAD_REQUEST, '"verbose" must be a boolean') self.bump_cost(1.0) - return await self.daemon_request('getrawtransaction', tx_hash, verbose) - async def transaction_merkle(self, tx_hash, height): + blockhash = None + if not self.env.daemon_has_txindex: + height, tx_pos = await self.db.get_blockheight_and_txpos_for_txhash(tx_hash_bytes) + if height is not None: + block_header = self.db.raw_header(height) + blockhash = self.coin.header_hash(block_header).hex() + + return await self.daemon_request('getrawtransaction', tx_hash_hex, verbose, blockhash) + + async def transaction_merkle(self, tx_hash, height=None): '''Return the merkle branch to a confirmed transaction given its hash and height. @@ -1436,12 +1813,14 @@ async def transaction_merkle(self, tx_hash, height): height: the height of the block it is in ''' tx_hash = assert_tx_hash(tx_hash) - height = non_negative_integer(height) + if height is not None: + height = non_negative_integer(height) - branch, tx_pos, cost = await self.session_mgr.merkle_branch_for_tx_hash( - height, tx_hash) + height, branch, tx_pos, cost = await self.session_mgr.merkle_branch_for_tx_hash( + tx_hash=tx_hash, height=height) self.bump_cost(cost) + assert height is not None return {"block_height": height, "merkle": branch, "pos": tx_pos} async def transaction_id_from_pos(self, height, tx_pos, merkle=False): @@ -1482,7 +1861,7 @@ def set_request_handlers(self, ptuple): 'blockchain.headers.subscribe': self.headers_subscribe, 'blockchain.relayfee': self.relayfee, 'blockchain.scripthash.get_balance': self.scripthash_get_balance, - 'blockchain.scripthash.get_history': self.scripthash_get_history, + 'blockchain.scripthash.get_history': self.scripthash_get_history_proto_legacy, 'blockchain.scripthash.get_mempool': self.scripthash_get_mempool, 'blockchain.scripthash.listunspent': self.scripthash_listunspent, 'blockchain.scripthash.subscribe': self.scripthash_subscribe, @@ -1503,6 +1882,11 @@ def set_request_handlers(self, ptuple): if ptuple >= (1, 4, 2): handlers['blockchain.scripthash.unsubscribe'] = self.scripthash_unsubscribe + if ptuple >= (1, 5): + handlers['blockchain.outpoint.subscribe'] = self.txoutpoint_subscribe + handlers['blockchain.outpoint.unsubscribe'] = self.txoutpoint_unsubscribe + handlers['blockchain.scripthash.get_history'] = self.scripthash_get_history_proto_1_5 + self.request_handlers = handlers @@ -1511,6 +1895,8 @@ class LocalRPC(SessionBase): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) + self.sv_seen = True + self.sv_negotiated.set() self.client = 'RPC' self.connection.max_response_size = 0 @@ -1538,9 +1924,19 @@ def set_request_handlers(self, ptuple): 'protx.info': self.protx_info, }) - async def notify(self, touched, height_changed): + async def notify( + self, + *, + touched_hashxs: Set[bytes], + touched_outpoints: Set[Tuple[bytes, int]], + height_changed: bool, + ): '''Notify the client about changes in masternode list.''' - await super().notify(touched, height_changed) + await super().notify( + touched_hashxs=touched_hashxs, + touched_outpoints=touched_outpoints, + height_changed=height_changed, + ) for mn in self.mns.copy(): status = await self.daemon_request('masternode_list', ('status', mn)) @@ -1751,33 +2147,38 @@ async def block_header(self, height, cp_height=0): return result # Covered by a checkpoint; truncate AuxPoW data - result['header'] = self.truncate_auxpow(result['header'], height) + result['header'] = self.truncate_auxpow_single(result['header']) return result async def block_headers(self, start_height, count, cp_height=0): - result = await super().block_headers(start_height, count, cp_height) - # Older protocol versions don't truncate AuxPoW if self.protocol_tuple < (1, 4, 1): - return result + return await super().block_headers(start_height, count, cp_height) # Not covered by a checkpoint; return full AuxPoW data if cp_height == 0: - return result + return await super().block_headers(start_height, count, cp_height) + + result = await super().block_headers_array(start_height, count, cp_height) # Covered by a checkpoint; truncate AuxPoW data - result['hex'] = self.truncate_auxpow(result['hex'], start_height) - return result + result['headers'] = self.truncate_auxpow_headers(result['headers']) - def truncate_auxpow(self, headers_full_hex, start_height): - height = start_height - headers_full = util.hex_to_bytes(headers_full_hex) - cursor = 0 - headers = bytearray() + # Return headers in array form + if self.protocol_tuple >= (1, 5): + return result - while cursor < len(headers_full): - headers += headers_full[cursor:cursor+self.coin.TRUNCATED_HEADER_SIZE] - cursor += self.db.dynamic_header_len(height) - height += 1 + # Return headers in concatenated form + result['hex'] = ''.join(result['headers']) + del result['headers'] + return result + + def truncate_auxpow_headers(self, headers): + result = [] + for header in headers: + result.append(self.truncate_auxpow_single(header)) + return result - return headers.hex() + def truncate_auxpow_single(self, header: str): + # 2 hex chars per byte + return header[:2*self.coin.TRUNCATED_HEADER_SIZE] diff --git a/electrumx/server/storage.py b/electrumx/server/storage.py index a45c27002..377cf44d9 100644 --- a/electrumx/server/storage.py +++ b/electrumx/server/storage.py @@ -65,6 +65,13 @@ def iterator(self, prefix=b'', reverse=False): If `prefix` is set, only keys starting with `prefix` will be included. If `reverse` is True the items are returned in reverse order. + + The iterator supports .seek(key), which moves it just left of `key`. + - if forward-iterating + - if `key` is present, it will be the next item + - if `key` is not present, the next item will be the smallest still greater than `key` + - if reverse-iterating + - the next item will be the largest still smaller than `key` ''' raise NotImplementedError @@ -85,10 +92,32 @@ def open(self, name, create): self.close = self.db.close self.get = self.db.get self.put = self.db.put - self.iterator = self.db.iterator self.write_batch = partial(self.db.write_batch, transaction=True, sync=True) + def iterator(self, prefix=b'', reverse=False): + return LevelDBIterator(db=self.db, prefix=prefix, reverse=reverse) + + +class LevelDBIterator: + '''An iterator for LevelDB.''' + + def __init__(self, *, db, prefix, reverse): + self.prefix = prefix + self.iterator = db.iterator(prefix=prefix, reverse=reverse) + + def __iter__(self): + return self + + def __next__(self): + k, v = next(self.iterator) + if not k.startswith(self.prefix): + raise StopIteration + return k, v + + def seek(self, key: bytes) -> None: + self.iterator.seek(key) + class RocksDB(Storage): '''RocksDB database engine.''' @@ -119,7 +148,7 @@ def write_batch(self): return RocksDBWriteBatch(self.db) def iterator(self, prefix=b'', reverse=False): - return RocksDBIterator(self.db, prefix, reverse) + return RocksDBIterator(db=self.db, prefix=prefix, reverse=reverse) class RocksDBWriteBatch: @@ -140,8 +169,9 @@ def __exit__(self, exc_type, exc_val, exc_tb): class RocksDBIterator: '''An iterator for RocksDB.''' - def __init__(self, db, prefix, reverse): + def __init__(self, *, db, prefix, reverse): self.prefix = prefix + self._is_reverse = reverse if reverse: self.iterator = reversed(db.iteritems()) nxt_prefix = util.increment_byte_string(prefix) @@ -165,3 +195,11 @@ def __next__(self): if not k.startswith(self.prefix): raise StopIteration return k, v + + def seek(self, key: bytes) -> None: + self.iterator.seek(key) + if self._is_reverse: + try: + next(self) + except StopIteration: + self.iterator.seek_to_last() diff --git a/electrumx_compact_history b/electrumx_compact_history deleted file mode 100755 index b2cd5fc25..000000000 --- a/electrumx_compact_history +++ /dev/null @@ -1,82 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (c) 2017, Neil Booth -# -# All rights reserved. -# -# See the file "LICENCE" for information about the copyright -# and warranty status of this software. - -'''Script to compact the history database. This should save space and -will reset the flush counter to a low number, avoiding overflow when -the flush count reaches 65,536. - -This needs to lock the database so ElectrumX must not be running - -shut it down cleanly first. - -It is recommended you run this script with the same environment as -ElectrumX. However, it is intended to be runnable with just -DB_DIRECTORY and COIN set (COIN defaults as for ElectrumX). - -If you use daemon tools, you might run this script like so: - - envdir /path/to/the/environment/directory ./compact_history.py - -Depending on your hardware, this script may take up to 6 hours to -complete; it logs progress regularly. - -Compaction can be interrupted and restarted harmlessly and will pick -up where it left off. However, if you restart ElectrumX without -running the compaction to completion, it will not benefit and -subsequent compactions will restart from the beginning. -''' - -import asyncio -import logging -import sys -import traceback -from os import environ - -from electrumx import Env -from electrumx.server.db import DB - - -async def compact_history(): - if sys.version_info < (3, 7): - raise RuntimeError('Python >= 3.7 is required to run ElectrumX') - - environ['DAEMON_URL'] = '' # Avoid Env erroring out - env = Env() - db = DB(env) - await db.open_for_compacting() - - assert not db.first_sync - history = db.history - # Continue where we left off, if interrupted - if history.comp_cursor == -1: - history.comp_cursor = 0 - - history.comp_flush_count = max(history.comp_flush_count, 1) - limit = 8 * 1000 * 1000 - - while history.comp_cursor != -1: - history._compact_history(limit) - - # When completed also update the UTXO flush count - db.set_flush_count(history.flush_count) - -def main(): - logging.basicConfig(level=logging.INFO) - logging.info('Starting history compaction...') - loop = asyncio.get_event_loop() - try: - loop.run_until_complete(compact_history()) - except Exception: - traceback.print_exc() - logging.critical('History compaction terminated abnormally') - else: - logging.info('History compaction complete') - - -if __name__ == '__main__': - main() diff --git a/setup.py b/setup.py index d8f942759..18bcf4bb4 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ setuptools.setup( name='e-x', version=version, - scripts=['electrumx_server', 'electrumx_rpc', 'electrumx_compact_history'], + scripts=['electrumx_server', 'electrumx_rpc'], python_requires='>=3.7', install_requires=['aiorpcX[ws]>=0.18.5,<0.19', 'attrs', 'plyvel', 'pylru', 'aiohttp>=3.3,<4'], diff --git a/tests/server/test_compaction.py b/tests/server/test_compaction.py deleted file mode 100644 index ad6c96a43..000000000 --- a/tests/server/test_compaction.py +++ /dev/null @@ -1,133 +0,0 @@ -'''Test of compaction code in server/history.py''' -import array -import random -from os import environ, urandom - -import pytest - -from electrumx.lib.hash import HASHX_LEN -from electrumx.lib.util import pack_be_uint16, pack_le_uint64 -from electrumx.server.env import Env -from electrumx.server.db import DB - - -def create_histories(history, hashX_count=100): - '''Creates a bunch of random transaction histories, and write them - to disk in a series of small flushes.''' - hashXs = [urandom(HASHX_LEN) for n in range(hashX_count)] - mk_array = lambda : array.array('Q') - histories = {hashX : mk_array() for hashX in hashXs} - unflushed = history.unflushed - tx_num = 0 - while hashXs: - tx_numb = pack_le_uint64(tx_num)[:5] - hash_indexes = set(random.randrange(len(hashXs)) - for n in range(1 + random.randrange(4))) - for index in hash_indexes: - histories[hashXs[index]].append(tx_num) - unflushed[hashXs[index]].extend(tx_numb) - - tx_num += 1 - # Occasionally flush and drop a random hashX if non-empty - if random.random() < 0.1: - history.flush() - index = random.randrange(0, len(hashXs)) - if histories[hashXs[index]]: - del hashXs[index] - - return histories - - -def check_hashX_compaction(history): - history.max_hist_row_entries = 40 - row_size = history.max_hist_row_entries * 5 - full_hist = b''.join(pack_le_uint64(tx_num)[:5] for tx_num in range(100)) - hashX = urandom(HASHX_LEN) - pairs = ((1, 20), (26, 50), (56, 30)) - - cum = 0 - hist_list = [] - hist_map = {} - for flush_count, count in pairs: - key = hashX + pack_be_uint16(flush_count) - hist = full_hist[cum * 5: (cum+count) * 5] - hist_map[key] = hist - hist_list.append(hist) - cum += count - - write_items = [] - keys_to_delete = set() - write_size = history._compact_hashX(hashX, hist_map, hist_list, - write_items, keys_to_delete) - # Check results for sanity - assert write_size == len(full_hist) - assert len(write_items) == 3 - assert len(keys_to_delete) == 3 - assert len(hist_map) == len(pairs) - for n, item in enumerate(write_items): - assert item == (hashX + pack_be_uint16(n), - full_hist[n * row_size: (n + 1) * row_size]) - for flush_count, count in pairs: - assert hashX + pack_be_uint16(flush_count) in keys_to_delete - - # Check re-compaction is null - hist_map = {key: value for key, value in write_items} - hist_list = [value for key, value in write_items] - write_items.clear() - keys_to_delete.clear() - write_size = history._compact_hashX(hashX, hist_map, hist_list, - write_items, keys_to_delete) - assert write_size == 0 - assert len(write_items) == 0 - assert len(keys_to_delete) == 0 - assert len(hist_map) == len(pairs) - - # Check re-compaction adding a single tx writes the one row - hist_list[-1] += array.array('I', [100]).tobytes() - write_size = history._compact_hashX(hashX, hist_map, hist_list, - write_items, keys_to_delete) - assert write_size == len(hist_list[-1]) - assert write_items == [(hashX + pack_be_uint16(2), hist_list[-1])] - assert len(keys_to_delete) == 1 - assert write_items[0][0] in keys_to_delete - assert len(hist_map) == len(pairs) - - -def check_written(history, histories): - for hashX, hist in histories.items(): - db_hist = array.array('I', history.get_txnums(hashX, limit=None)) - assert hist == db_hist - - -def compact_history(history): - '''Synchronously compact the DB history.''' - history.comp_cursor = 0 - - history.comp_flush_count = max(history.comp_flush_count, 1) - limit = 5 * 1000 - - write_size = 0 - while history.comp_cursor != -1: - write_size += history._compact_history(limit) - assert write_size != 0 - - -@pytest.mark.asyncio -async def test_compaction(tmpdir): - db_dir = str(tmpdir) - print(f'Temp dir: {db_dir}') - environ.clear() - environ['DB_DIRECTORY'] = db_dir - environ['DAEMON_URL'] = '' - environ['COIN'] = 'BitcoinSV' - db = DB(Env()) - await db.open_for_serving() - history = db.history - - # Test abstract compaction - check_hashX_compaction(history) - # Now test in with random data - histories = create_histories(history) - check_written(history, histories) - compact_history(history) - check_written(history, histories) diff --git a/tests/server/test_env.py b/tests/server/test_env.py index 84595bbb5..0a7e1f4b9 100644 --- a/tests/server/test_env.py +++ b/tests/server/test_env.py @@ -415,17 +415,3 @@ def test_ban_versions(): def test_coin_class_provided(): e = Env(lib_coins.BitcoinSV) assert e.coin == lib_coins.BitcoinSV - - -def test_drop_unknown_clients(): - e = Env() - assert e.drop_client_unknown is False - os.environ['DROP_CLIENT_UNKNOWN'] = "" - e = Env() - assert e.drop_client_unknown is False - os.environ['DROP_CLIENT_UNKNOWN'] = "1" - e = Env() - assert e.drop_client_unknown is True - os.environ['DROP_CLIENT_UNKNOWN'] = "whatever" - e = Env() - assert e.drop_client_unknown is True diff --git a/tests/server/test_mempool.py b/tests/server/test_mempool.py index 2ee33b93b..625754140 100644 --- a/tests/server/test_mempool.py +++ b/tests/server/test_mempool.py @@ -210,34 +210,20 @@ def cached_height(self): return self._cached_height async def mempool_hashes(self): - '''Query bitcoind for the hashes of all transactions in its - mempool, returned as a list.''' await sleep(0) return [hash_to_hex_str(hash) for hash in self.txs] async def raw_transactions(self, hex_hashes): - '''Query bitcoind for the serialized raw transactions with the given - hashes. Missing transactions are returned as None. - - hex_hashes is an iterable of hexadecimal hash strings.''' await sleep(0) hashes = [hex_str_to_hash(hex_hash) for hex_hash in hex_hashes] return [self.raw_txs.get(hash) for hash in hashes] async def lookup_utxos(self, prevouts): - '''Return a list of (hashX, value) pairs each prevout if unspent, - otherwise return None if spent or not found. - - prevouts - an iterable of (hash, index) pairs - ''' await sleep(0) return [self.db_utxos.get(prevout) for prevout in prevouts] - async def on_mempool(self, touched, height): - '''Called each time the mempool is synchronized. touched is a set of - hashXs touched since the previous call. height is the - daemon's height at the time the mempool was obtained.''' - self.on_mempool_calls.append((touched, height)) + async def on_mempool(self, *, touched_hashxs, touched_outpoints, height): + self.on_mempool_calls.append((touched_hashxs, height)) await sleep(0) diff --git a/tests/server/test_notifications.py b/tests/server/test_notifications.py index 435a6c393..006097036 100644 --- a/tests/server/test_notifications.py +++ b/tests/server/test_notifications.py @@ -7,15 +7,15 @@ async def test_simple_mempool(): n = Notifications() notified = [] - async def notify(height, touched): - notified.append((height, touched)) + async def notify(*, touched_hashxs, touched_outpoints, height): + notified.append((height, touched_hashxs)) await n.start(5, notify) - mtouched = {'a', 'b'} - btouched = {'b', 'c'} - await n.on_mempool(mtouched, 6) + mtouched = {b'a', b'b'} + btouched = {b'b', b'c'} + await n.on_mempool(touched_hashxs=mtouched, height=6, touched_outpoints=set()) assert notified == [(5, set())] - await n.on_block(btouched, 6) + await n.on_block(touched_hashxs=btouched, height=6, touched_outpoints=set()) assert notified == [(5, set()), (6, set.union(mtouched, btouched))] @@ -23,23 +23,23 @@ async def notify(height, touched): async def test_enter_mempool_quick_blocks_2(): n = Notifications() notified = [] - async def notify(height, touched): - notified.append((height, touched)) + async def notify(*, touched_hashxs, touched_outpoints, height): + notified.append((height, touched_hashxs)) await n.start(5, notify) # Suppose a gets in block 6 and blocks 7,8 found right after and # the block processer processes them together. - await n.on_mempool({'a'}, 5) - assert notified == [(5, set()), (5, {'a'})] + await n.on_mempool(touched_hashxs={b'a'}, height=5, touched_outpoints=set()) + assert notified == [(5, set()), (5, {b'a'})] # Mempool refreshes with daemon on block 6 - await n.on_mempool({'a'}, 6) - assert notified == [(5, set()), (5, {'a'})] + await n.on_mempool(touched_hashxs={b'a'}, height=6, touched_outpoints=set()) + assert notified == [(5, set()), (5, {b'a'})] # Blocks 6, 7 processed together - await n.on_block({'a', 'b'}, 7) - assert notified == [(5, set()), (5, {'a'})] + await n.on_block(touched_hashxs={b'a', b'b'}, height=7, touched_outpoints=set()) + assert notified == [(5, set()), (5, {b'a'})] # Then block 8 processed - await n.on_block({'c'}, 8) - assert notified == [(5, set()), (5, {'a'})] + await n.on_block(touched_hashxs={b'c'}, height=8, touched_outpoints=set()) + assert notified == [(5, set()), (5, {b'a'})] # Now mempool refreshes - await n.on_mempool(set(), 8) - assert notified == [(5, set()), (5, {'a'}), (8, {'a', 'b', 'c'})] + await n.on_mempool(touched_hashxs=set(), height=8, touched_outpoints=set()) + assert notified == [(5, set()), (5, {b'a'}), (8, {b'a', b'b', b'c'})] diff --git a/tests/server/test_storage.py b/tests/server/test_storage.py index 3cfb6e172..b47b963f5 100644 --- a/tests/server/test_storage.py +++ b/tests/server/test_storage.py @@ -66,6 +66,76 @@ def test_iterator_reverse(db): ] +def test_iterator_seek(db): + db.put(b"first-key1", b"val") + db.put(b"first-key2", b"val") + db.put(b"first-key3", b"val") + db.put(b"key-1", b"value-1") + db.put(b"key-5", b"value-5") + db.put(b"key-3", b"value-3") + db.put(b"key-8", b"value-8") + db.put(b"key-2", b"value-2") + db.put(b"key-4", b"value-4") + db.put(b"last-key1", b"val") + db.put(b"last-key2", b"val") + db.put(b"last-key3", b"val") + # forward-iterate, key present, no prefix + it = db.iterator() + it.seek(b"key-4") + assert list(it) == [(b"key-4", b"value-4"), (b"key-5", b"value-5"), (b"key-8", b"value-8"), + (b"last-key1", b"val"), (b"last-key2", b"val"), (b"last-key3", b"val")] + # forward-iterate, key present + it = db.iterator(prefix=b"key-") + it.seek(b"key-4") + assert list(it) == [(b"key-4", b"value-4"), (b"key-5", b"value-5"), + (b"key-8", b"value-8")] + # forward-iterate, key missing + it = db.iterator(prefix=b"key-") + it.seek(b"key-6") + assert list(it) == [(b"key-8", b"value-8")] + # forward-iterate, after last prefix + it = db.iterator(prefix=b"key-") + it.seek(b"key-9") + assert list(it) == [] + # forward-iterate, after last, no prefix + it = db.iterator() + it.seek(b"z") + assert list(it) == [] + # forward-iterate, no such prefix + it = db.iterator(prefix=b"key---") + it.seek(b"key---5") + assert list(it) == [] + # forward-iterate, seek outside prefix + it = db.iterator(prefix=b"key-") + it.seek(b"last-key2") + assert list(it) == [] + # reverse-iterate, key present + it = db.iterator(prefix=b"key-", reverse=True) + it.seek(b"key-4") + assert list(it) == [(b"key-3", b"value-3"), (b"key-2", b"value-2"), (b"key-1", b"value-1")] + # reverse-iterate, key missing + it = db.iterator(prefix=b"key-", reverse=True) + it.seek(b"key-7") + assert list(it) == [(b"key-5", b"value-5"), (b"key-4", b"value-4"), (b"key-3", b"value-3"), + (b"key-2", b"value-2"), (b"key-1", b"value-1")] + # reverse-iterate, before first prefix + it = db.iterator(prefix=b"key-", reverse=True) + it.seek(b"key-0") + assert list(it) == [] + # reverse-iterate, before first, no prefix + it = db.iterator(reverse=True) + it.seek(b"a") + assert list(it) == [] + # reverse-iterate, no such prefix + it = db.iterator(prefix=b"key---", reverse=True) + it.seek(b"key---5") + assert list(it) == [] + # reverse-iterate, seek outside prefix + it = db.iterator(prefix=b"key-", reverse=True) + it.seek(b"first-key2") + assert list(it) == [] + + def test_close(db): db.put(b"a", b"b") db.close()