Skip to content

Commit

Permalink
Merge pull request #2322 from CounterpartyXCP/develop
Browse files Browse the repository at this point in the history
v10.4.4
  • Loading branch information
adamkrellenstein authored Oct 9, 2024
2 parents 6bfc1a3 + b602cdf commit 679ab89
Show file tree
Hide file tree
Showing 31 changed files with 4,396 additions and 3,859 deletions.
9 changes: 1 addition & 8 deletions .github/workflows/test_compose.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@ docker container prune -f
docker rmi counterparty/counterparty:$VERSION || true

# build the counterparty-core new image
docker build -t counterparty/counterparty:$VERSION . > build.txt 2>&1
COUNTERPARTY_RS_CACHED=$(awk '/COPY \.\/counterparty-rs \/counterparty-rs/{getline; print}' build.txt | awk '{print $2}')
cat build.txt
docker build -t counterparty/counterparty:$VERSION .

# re-start containers
docker compose --profile mainnet up -d
Expand Down Expand Up @@ -130,11 +128,6 @@ done
# Run compare hashes test
. "$HOME/.profile"
cd counterparty-core

if [ "$COUNTERPARTY_RS_CACHED" != "CACHED" ]; then
hatch env prune
fi

sudo python3 -m pytest counterpartycore/test/mainnet_test.py --testapidb --comparehashes
cd ..

Expand Down
3,550 changes: 1,770 additions & 1,780 deletions apiary.apib

Large diffs are not rendered by default.

16 changes: 16 additions & 0 deletions counterparty-core/counterpartycore/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,22 @@ def float_range_checker(arg):
"help": "show logs in JSON format",
},
],
[
("--wsgi-server",),
{
"default": "werkzeug",
"help": "WSGI server to use (gunicorn or werkzeug)",
"choices": ["gunicorn", "werkzeug"],
},
],
[
("--gunicorn-workers",),
{
"type": int,
"default": 2,
"help": "number of worker processes for gunicorn",
},
],
]


Expand Down
150 changes: 46 additions & 104 deletions counterparty-core/counterpartycore/lib/api/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,46 +5,41 @@
import time
from collections import OrderedDict
from multiprocessing import Process, Value
from threading import Thread, Timer
from threading import Thread

import flask
import requests
from bitcoin.wallet import CBitcoinAddressError
from counterpartycore import server
from counterpartycore.lib import (
backend,
config,
exceptions,
ledger,
sentry,
util,
)
from counterpartycore.lib.api import api_watcher, queries
from counterpartycore.lib.api import api_watcher, queries, wsgi
from counterpartycore.lib.api.routes import ROUTES
from counterpartycore.lib.api.util import (
clean_rowids_and_confirmed_fields,
function_needs_db,
get_backend_height,
init_api_access_log,
inject_details,
to_json,
)
from counterpartycore.lib.database import APIDBConnectionPool, get_db_connection
from counterpartycore.lib.database import APIDBConnectionPool
from flask import Flask, request
from flask_httpauth import HTTPBasicAuth
from sentry_sdk import capture_exception
from sentry_sdk import configure_scope as configure_sentry_scope
from werkzeug.serving import make_server
from sentry_sdk import start_span as start_sentry_span

multiprocessing.set_start_method("spawn", force=True)

logger = logging.getLogger(config.LOGGER_NAME)
auth = HTTPBasicAuth()

BACKEND_HEIGHT = None
CURRENT_BLOCK_TIME = None
REFRESH_BACKEND_HEIGHT_INTERVAL = 10
BACKEND_HEIGHT_TIMER = None

BLOCK_CACHE = OrderedDict()
MAX_BLOCK_CACHE_SIZE = 1000

Expand All @@ -70,33 +65,17 @@ def api_root():
elif config.TESTCOIN:
network = "testcoin"
return {
"server_ready": counterparty_height >= BACKEND_HEIGHT,
"server_ready": counterparty_height >= wsgi.BACKEND_HEIGHT,
"network": network,
"version": config.VERSION_STRING,
"backend_height": BACKEND_HEIGHT,
"backend_height": wsgi.BACKEND_HEIGHT,
"counterparty_height": counterparty_height,
"documentation": "https://counterpartycore.docs.apiary.io/",
"routes": f"{request.url_root}v2/routes",
"blueprint": "https://raw.githubusercontent.com/CounterpartyXCP/counterparty-core/refs/heads/master/apiary.apib",
}


def is_server_ready():
# TODO: find a way to mock this function for testing
try:
if request.url_root == "http://localhost:10009/":
return True
except RuntimeError:
pass
if BACKEND_HEIGHT is None:
return False
if util.CURRENT_BLOCK_INDEX in [BACKEND_HEIGHT, BACKEND_HEIGHT - 1]:
return True
if time.time() - CURRENT_BLOCK_TIME < 60:
return True
return False


def is_cachable(rule):
if rule.startswith("/v2/blocks"):
return True
Expand Down Expand Up @@ -155,9 +134,9 @@ def return_result(
api_result["error"] = error
response = flask.make_response(to_json(api_result), http_code)
response.headers["X-COUNTERPARTY-HEIGHT"] = util.CURRENT_BLOCK_INDEX
response.headers["X-COUNTERPARTY-READY"] = is_server_ready()
response.headers["X-COUNTERPARTY-READY"] = wsgi.is_server_ready()
response.headers["X-COUNTERPARTY-VERSION"] = config.VERSION_STRING
response.headers["X-BITCOIN-HEIGHT"] = BACKEND_HEIGHT
response.headers["X-BITCOIN-HEIGHT"] = wsgi.BACKEND_HEIGHT
response.headers["Content-Type"] = "application/json"
if not config.API_NO_ALLOW_CORS:
response.headers["Access-Control-Allow-Origin"] = "*"
Expand Down Expand Up @@ -225,9 +204,16 @@ def execute_api_function(db, rule, route, function_args):
) and not request.path.startswith("/v2/blocks/last"):
cache_key = request.url

if cache_key in BLOCK_CACHE:
result = BLOCK_CACHE[cache_key]
else:
with start_sentry_span(op="cache.get") as sentry_get_span:
sentry_get_span.set_data("cache.key", cache_key)
if cache_key in BLOCK_CACHE:
result = BLOCK_CACHE[cache_key]
sentry_get_span.set_data("cache.hit", True)
return result
else:
sentry_get_span.set_data("cache.hit", False)

with start_sentry_span(op="cache.put") as sentry_put_span:
if function_needs_db(route["function"]):
result = route["function"](db, **function_args)
else:
Expand All @@ -238,11 +224,12 @@ def execute_api_function(db, rule, route, function_args):
and route["function"].__name__ != "redirect_to_api_v1"
and not request.path.startswith("/v2/mempool/")
):
sentry_put_span.set_data("cache.key", cache_key)
BLOCK_CACHE[cache_key] = result
if len(BLOCK_CACHE) > MAX_BLOCK_CACHE_SIZE:
BLOCK_CACHE.popitem(last=False)

return result
return result


def get_transaction_name(rule):
Expand All @@ -253,18 +240,6 @@ def get_transaction_name(rule):
return "".join([part.capitalize() for part in ROUTES[rule]["function"].__name__.split("_")])


def refresh_current_block(db):
# update the current block index
global CURRENT_BLOCK_TIME # noqa F811
last_block = ledger.get_last_block(db)
if last_block:
util.CURRENT_BLOCK_INDEX = last_block["block_index"]
CURRENT_BLOCK_TIME = last_block["block_time"]
else:
util.CURRENT_BLOCK_INDEX = 0
CURRENT_BLOCK_TIME = 0


@auth.login_required
def handle_route(**kwargs):
start_time = time.time()
Expand All @@ -273,7 +248,7 @@ def handle_route(**kwargs):
logger.trace(f"API Request - {request.remote_addr} {request.method} {request.url}")
logger.debug(get_log_prefix(query_args))

if BACKEND_HEIGHT is None:
if wsgi.BACKEND_HEIGHT is None:
return return_result(
503,
error="Backend still not ready. Please try again later.",
Expand All @@ -286,7 +261,7 @@ def handle_route(**kwargs):
with configure_sentry_scope() as scope:
scope.set_transaction_name(get_transaction_name(rule))

if not is_server_ready() and not return_result_if_not_ready(rule):
if not wsgi.is_server_ready() and not return_result_if_not_ready(rule):
return return_result(
503, error="Counterparty not ready", start_time=start_time, query_args=query_args
)
Expand Down Expand Up @@ -373,15 +348,7 @@ def handle_doc():
return flask.send_file(BLUEPRINT_FILEPATH)


def run_api_server(args, interruped_value, server_ready_value):
sentry.init()
# Initialise log and config
server.initialise_log_and_config(argparse.Namespace(**args))

watcher = api_watcher.APIWatcher()
watcher.start()

logger.info("Starting API Server...")
def init_flask_app():
app = Flask(config.APP_NAME)
with app.app_context():
# Initialise the API access log
Expand All @@ -404,72 +371,47 @@ def run_api_server(args, interruped_value, server_ready_value):
app.add_url_rule(path, view_func=handle_route, methods=methods, strict_slashes=False)

app.register_error_handler(404, handle_not_found)
# run the scheduler to refresh the backend height
# `no_refresh_backend_height` used only for testing. TODO: find a way to mock it
timer_db = get_db_connection(config.API_DATABASE, read_only=True, check_wal=False)
if "no_refresh_backend_height" not in args or not args["no_refresh_backend_height"]:
refresh_backend_height(timer_db, start=True)
else:
refresh_current_block(timer_db)
global BACKEND_HEIGHT # noqa F811
BACKEND_HEIGHT = 0

return app


def run_api_server(args, interruped_value, server_ready_value):
sentry.init()
# Initialise log and config
server.initialise_log_and_config(argparse.Namespace(**args))

watcher = api_watcher.APIWatcher()
watcher.start()

logger.info("Starting API Server...")
app = init_flask_app()

try:
# Init the HTTP Server.
werkzeug_server = make_server(config.API_HOST, config.API_PORT, app, threaded=True)
ParentProcessChecker(interruped_value, werkzeug_server).start()
wsgi_server = wsgi.WSGIApplication(app, args=args)
ParentProcessChecker(interruped_value, wsgi_server).start()
app.app_context().push()
# Run app server (blocking)
server_ready_value.value = 1
werkzeug_server.serve_forever()
wsgi_server.run()
except KeyboardInterrupt:
logger.trace("Keyboard Interrupt!")
finally:
logger.trace("Shutting down API Server...")
watcher.stop()
watcher.join()
werkzeug_server.shutdown()
# ensure timer is cancelled
if BACKEND_HEIGHT_TIMER:
BACKEND_HEIGHT_TIMER.cancel()
if timer_db:
timer_db.close()
wsgi_server.stop()
APIDBConnectionPool().close()


def refresh_backend_height(db, start=False):
global BACKEND_HEIGHT, BACKEND_HEIGHT_TIMER # noqa F811
if not start:
BACKEND_HEIGHT = get_backend_height()
refresh_current_block(db)
backend.addrindexrs.clear_raw_transactions_cache()
if not is_server_ready():
if BACKEND_HEIGHT > util.CURRENT_BLOCK_INDEX:
logger.debug(
f"Counterparty is currently behind Bitcoin Core. ({util.CURRENT_BLOCK_INDEX} < {BACKEND_HEIGHT})"
)
else:
logger.debug(
f"Bitcoin Core is currently behind the network. ({util.CURRENT_BLOCK_INDEX} > {BACKEND_HEIGHT})"
)
else:
# starting the timer is not blocking in case of Addrindexrs is not ready
BACKEND_HEIGHT_TIMER = Timer(0.5, refresh_backend_height, (db,))
BACKEND_HEIGHT_TIMER.start()
return
if BACKEND_HEIGHT_TIMER:
BACKEND_HEIGHT_TIMER.cancel()
BACKEND_HEIGHT_TIMER = Timer(REFRESH_BACKEND_HEIGHT_INTERVAL, refresh_backend_height, (db,))
BACKEND_HEIGHT_TIMER.start()


# This thread is used for the following two reasons:
# 1. `docker-compose stop` does not send a SIGTERM to the child processes (in this case the API v2 process)
# 2. `process.terminate()` does not trigger a `KeyboardInterrupt` or execute the `finally` block.
class ParentProcessChecker(Thread):
def __init__(self, interruped_value, werkzeug_server):
def __init__(self, interruped_value, wsgi_server):
super().__init__()
self.interruped_value = interruped_value
self.werkzeug_server = werkzeug_server
self.wsgi_server = wsgi_server

def run(self):
try:
Expand All @@ -479,7 +421,7 @@ def run(self):
else:
logger.trace("Parent process is dead. Exiting...")
break
self.werkzeug_server.shutdown()
self.wsgi_server.stop()
except KeyboardInterrupt:
pass

Expand Down
14 changes: 1 addition & 13 deletions counterparty-core/counterpartycore/lib/api/api_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -817,19 +817,7 @@ def get_running_info():
else:
caught_up = True

try:
cursor = db.cursor()
blocks = list(
cursor.execute(
"""SELECT * FROM blocks WHERE block_index = ?""",
(util.CURRENT_BLOCK_INDEX,),
)
)
assert len(blocks) == 1
last_block = blocks[0]
cursor.close()
except: # noqa: E722
last_block = None
last_block = ledger.get_last_block(db)

try:
last_message = ledger.last_message(db)
Expand Down
Loading

0 comments on commit 679ab89

Please sign in to comment.