Gitee/searx/network/__init__.py
Markus Heiser edfbf1e118 [refactor] typification of SearXNG (initial) / result items (part 1)
Typification of SearXNG
=======================

This patch introduces the typing of the results.  The why and how is described
in the documentation, please generate the documentation ..

    $ make docs.clean docs.live

and read the following articles in the "Developer documentation":

- result types --> http://0.0.0.0:8000/dev/result_types/index.html

The result types are available from the `searx.result_types` module.  The
following have been implemented so far:

- base result type: `searx.result_type.Result`
  --> http://0.0.0.0:8000/dev/result_types/base_result.html

- answer results
  --> http://0.0.0.0:8000/dev/result_types/answer.html

including the type for translations (inspired by #3925).  For all other
types (which still need to be set up in subsequent PRs), template documentation
has been created for the transition period.

Doc of the fields used in Templates
===================================

The template documentation is the basis for the typing and is the first complete
documentation of the results (needed for engine development).  It is the
"working paper" (the plan) with which further typifications can be implemented
in subsequent PRs.

- https://github.com/searxng/searxng/issues/357

Answer Templates
================

With the new (sub) types for `Answer`, the templates for the answers have also
been revised, `Translation` are now displayed with collapsible entries (inspired
by #3925).

    !en-de dog

Plugins & Answerer
==================

The implementation for `Plugin` and `Answer` has been revised, see
documentation:

- Plugin: http://0.0.0.0:8000/dev/plugins/index.html
- Answerer: http://0.0.0.0:8000/dev/answerers/index.html

With `AnswerStorage` and `AnswerStorage` to manage those items (in follow up
PRs, `ArticleStorage`, `InfoStorage` and .. will be implemented)

Autocomplete
============

The autocompletion had a bug where the results from `Answer` had not been shown
in the past.  To test activate autocompletion and try search terms for which we
have answerers

- statistics: type `min 1 2 3` .. in the completion list you should find an
  entry like `[de] min(1, 2, 3) = 1`

- random: type `random uuid` .. in the completion list, the first item is a
  random UUID

Extended Types
==============

SearXNG extends e.g. the request and response types of flask and httpx, a module
has been set up for type extensions:

- Extended Types
  --> http://0.0.0.0:8000/dev/extended_types.html

Unit-Tests
==========

The unit tests have been completely revised.  In the previous implementation,
the runtime (the global variables such as `searx.settings`) was not initialized
before each test, so the runtime environment with which a test ran was always
determined by the tests that ran before it.  This was also the reason why we
sometimes had to observe non-deterministic errors in the tests in the past:

- https://github.com/searxng/searxng/issues/2988 is one example for the Runtime
  issues, with non-deterministic behavior ..

- https://github.com/searxng/searxng/pull/3650
- https://github.com/searxng/searxng/pull/3654
- https://github.com/searxng/searxng/pull/3642#issuecomment-2226884469
- https://github.com/searxng/searxng/pull/3746#issuecomment-2300965005

Why msgspec.Struct
==================

We have already discussed typing based on e.g. `TypeDict` or `dataclass` in the past:

- https://github.com/searxng/searxng/pull/1562/files
- https://gist.github.com/dalf/972eb05e7a9bee161487132a7de244d2
- https://github.com/searxng/searxng/pull/1412/files
- https://github.com/searxng/searxng/pull/1356

In my opinion, TypeDict is unsuitable because the objects are still dictionaries
and not instances of classes / the `dataclass` are classes but ...

The `msgspec.Struct` combine the advantages of typing, runtime behaviour and
also offer the option of (fast) serializing (incl. type check) the objects.

Currently not possible but conceivable with `msgspec`: Outsourcing the engines
into separate processes, what possibilities this opens up in the future is left
to the imagination!

Internally, we have already defined that it is desirable to decouple the
development of the engines from the development of the SearXNG core / The
serialization of the `Result` objects is a prerequisite for this.

HINT: The threads listed above were the template for this PR, even though the
implementation here is based on msgspec.  They should also be an inspiration for
the following PRs of typification, as the models and implementations can provide
a good direction.

Why just one commit?
====================

I tried to create several (thematically separated) commits, but gave up at some
point ... there are too many things to tackle at once / The comprehensibility of
the commits would not be improved by a thematic separation. On the contrary, we
would have to make multiple changes at the same places and the goal of a change
would be vaguely recognizable in the fog of the commits.

Signed-off-by: Markus Heiser <markus.heiser@darmarit.de>
2025-01-28 07:07:08 +01:00

266 lines
8.3 KiB
Python

# SPDX-License-Identifier: AGPL-3.0-or-later
# pylint: disable=missing-module-docstring, global-statement
import asyncio
import threading
import concurrent.futures
from queue import SimpleQueue
from types import MethodType
from timeit import default_timer
from typing import Iterable, NamedTuple, Tuple, List, Dict, Union
from contextlib import contextmanager
import httpx
import anyio
from searx.extended_types import SXNG_Response
from .network import get_network, initialize, check_network_configuration # pylint:disable=cyclic-import
from .client import get_loop
from .raise_for_httperror import raise_for_httperror
THREADLOCAL = threading.local()
"""Thread-local data is data for thread specific values."""
def reset_time_for_thread():
THREADLOCAL.total_time = 0
def get_time_for_thread():
"""returns thread's total time or None"""
return THREADLOCAL.__dict__.get('total_time')
def set_timeout_for_thread(timeout, start_time=None):
THREADLOCAL.timeout = timeout
THREADLOCAL.start_time = start_time
def set_context_network_name(network_name):
THREADLOCAL.network = get_network(network_name)
def get_context_network():
"""If set return thread's network.
If unset, return value from :py:obj:`get_network`.
"""
return THREADLOCAL.__dict__.get('network') or get_network()
@contextmanager
def _record_http_time():
# pylint: disable=too-many-branches
time_before_request = default_timer()
start_time = getattr(THREADLOCAL, 'start_time', time_before_request)
try:
yield start_time
finally:
# update total_time.
# See get_time_for_thread() and reset_time_for_thread()
if hasattr(THREADLOCAL, 'total_time'):
time_after_request = default_timer()
THREADLOCAL.total_time += time_after_request - time_before_request
def _get_timeout(start_time, kwargs):
# pylint: disable=too-many-branches
# timeout (httpx)
if 'timeout' in kwargs:
timeout = kwargs['timeout']
else:
timeout = getattr(THREADLOCAL, 'timeout', None)
if timeout is not None:
kwargs['timeout'] = timeout
# 2 minutes timeout for the requests without timeout
timeout = timeout or 120
# adjust actual timeout
timeout += 0.2 # overhead
if start_time:
timeout -= default_timer() - start_time
return timeout
def request(method, url, **kwargs) -> SXNG_Response:
"""same as requests/requests/api.py request(...)"""
with _record_http_time() as start_time:
network = get_context_network()
timeout = _get_timeout(start_time, kwargs)
future = asyncio.run_coroutine_threadsafe(network.request(method, url, **kwargs), get_loop())
try:
return future.result(timeout)
except concurrent.futures.TimeoutError as e:
raise httpx.TimeoutException('Timeout', request=None) from e
def multi_requests(request_list: List["Request"]) -> List[Union[httpx.Response, Exception]]:
"""send multiple HTTP requests in parallel. Wait for all requests to finish."""
with _record_http_time() as start_time:
# send the requests
network = get_context_network()
loop = get_loop()
future_list = []
for request_desc in request_list:
timeout = _get_timeout(start_time, request_desc.kwargs)
future = asyncio.run_coroutine_threadsafe(
network.request(request_desc.method, request_desc.url, **request_desc.kwargs), loop
)
future_list.append((future, timeout))
# read the responses
responses = []
for future, timeout in future_list:
try:
responses.append(future.result(timeout))
except concurrent.futures.TimeoutError:
responses.append(httpx.TimeoutException('Timeout', request=None))
except Exception as e: # pylint: disable=broad-except
responses.append(e)
return responses
class Request(NamedTuple):
"""Request description for the multi_requests function"""
method: str
url: str
kwargs: Dict[str, str] = {}
@staticmethod
def get(url, **kwargs):
return Request('GET', url, kwargs)
@staticmethod
def options(url, **kwargs):
return Request('OPTIONS', url, kwargs)
@staticmethod
def head(url, **kwargs):
return Request('HEAD', url, kwargs)
@staticmethod
def post(url, **kwargs):
return Request('POST', url, kwargs)
@staticmethod
def put(url, **kwargs):
return Request('PUT', url, kwargs)
@staticmethod
def patch(url, **kwargs):
return Request('PATCH', url, kwargs)
@staticmethod
def delete(url, **kwargs):
return Request('DELETE', url, kwargs)
def get(url, **kwargs) -> SXNG_Response:
kwargs.setdefault('allow_redirects', True)
return request('get', url, **kwargs)
def options(url, **kwargs) -> SXNG_Response:
kwargs.setdefault('allow_redirects', True)
return request('options', url, **kwargs)
def head(url, **kwargs) -> SXNG_Response:
kwargs.setdefault('allow_redirects', False)
return request('head', url, **kwargs)
def post(url, data=None, **kwargs) -> SXNG_Response:
return request('post', url, data=data, **kwargs)
def put(url, data=None, **kwargs) -> SXNG_Response:
return request('put', url, data=data, **kwargs)
def patch(url, data=None, **kwargs) -> SXNG_Response:
return request('patch', url, data=data, **kwargs)
def delete(url, **kwargs) -> SXNG_Response:
return request('delete', url, **kwargs)
async def stream_chunk_to_queue(network, queue, method, url, **kwargs):
try:
async with await network.stream(method, url, **kwargs) as response:
queue.put(response)
# aiter_raw: access the raw bytes on the response without applying any HTTP content decoding
# https://www.python-httpx.org/quickstart/#streaming-responses
async for chunk in response.aiter_raw(65536):
if len(chunk) > 0:
queue.put(chunk)
except (httpx.StreamClosed, anyio.ClosedResourceError):
# the response was queued before the exception.
# the exception was raised on aiter_raw.
# we do nothing here: in the finally block, None will be queued
# so stream(method, url, **kwargs) generator can stop
pass
except Exception as e: # pylint: disable=broad-except
# broad except to avoid this scenario:
# exception in network.stream(method, url, **kwargs)
# -> the exception is not catch here
# -> queue None (in finally)
# -> the function below steam(method, url, **kwargs) has nothing to return
queue.put(e)
finally:
queue.put(None)
def _stream_generator(method, url, **kwargs):
queue = SimpleQueue()
network = get_context_network()
future = asyncio.run_coroutine_threadsafe(stream_chunk_to_queue(network, queue, method, url, **kwargs), get_loop())
# yield chunks
obj_or_exception = queue.get()
while obj_or_exception is not None:
if isinstance(obj_or_exception, Exception):
raise obj_or_exception
yield obj_or_exception
obj_or_exception = queue.get()
future.result()
def _close_response_method(self):
asyncio.run_coroutine_threadsafe(self.aclose(), get_loop())
# reach the end of _self.generator ( _stream_generator ) to an avoid memory leak.
# it makes sure that :
# * the httpx response is closed (see the stream_chunk_to_queue function)
# * to call future.result() in _stream_generator
for _ in self._generator: # pylint: disable=protected-access
continue
def stream(method, url, **kwargs) -> Tuple[httpx.Response, Iterable[bytes]]:
"""Replace httpx.stream.
Usage:
response, stream = poolrequests.stream(...)
for chunk in stream:
...
httpx.Client.stream requires to write the httpx.HTTPTransport version of the
the httpx.AsyncHTTPTransport declared above.
"""
generator = _stream_generator(method, url, **kwargs)
# yield response
response = next(generator) # pylint: disable=stop-iteration-return
if isinstance(response, Exception):
raise response
response._generator = generator # pylint: disable=protected-access
response.close = MethodType(_close_response_method, response)
return response, generator