跳转至

request

core.request

请求描述符与批量请求容器. 提供对 API 请求的抽象与调度.

Request dataclass

Request(
    *,
    _client: Client,
    module: str,
    method: str,
    param: dict[str, Any] | dict[int, Any],
    response_model: type[BaseModel] | None = None,
    comm: dict[str, int | str | bool] | None = None,
    is_jce: bool = False,
    preserve_bool: bool = False,
    credential: Credential | None = None,
    platform: Platform | None = None,
)

Bases: Generic[RequestResultT]

请求描述符.

replace

replace(**changes: Any) -> Request[RequestResultT]

返回一个应用了修改的新 Request 对象, 不会修改原对象.

Source code in qqmusic_api/core/request.py
def replace(self, **changes: Any) -> "Request[RequestResultT]":
    """返回一个应用了修改的新 Request 对象, 不会修改原对象."""
    if "param" not in changes:
        changes["param"] = copy.deepcopy(self.param)
    if "comm" not in changes and self.comm is not None:
        changes["comm"] = copy.deepcopy(self.comm)
    return dc_replace(self, **changes)

PaginatedRequest dataclass

PaginatedRequest(
    pager_meta: PagerMeta,
    *,
    _client: Client,
    module: str,
    method: str,
    param: dict[str, Any] | dict[int, Any],
    response_model: type[BaseModel] | None = None,
    comm: dict[str, int | str | bool] | None = None,
    is_jce: bool = False,
    preserve_bool: bool = False,
    credential: Credential | None = None,
    platform: Platform | None = None,
)

Bases: Request[RequestResultT]

声明了连续翻页能力的请求描述符.

get_pager_meta

get_pager_meta() -> PagerMeta

返回连续翻页元数据.

Source code in qqmusic_api/core/request.py
def get_pager_meta(self) -> PagerMeta:
    """返回连续翻页元数据."""
    return self.pager_meta

paginate

paginate(
    limit: int | None = None,
) -> ResponsePager[RequestResultT]

返回响应的分页迭代器.

PARAMETER DESCRIPTION
limit

最大获取页数.

TYPE: int | None DEFAULT: None

Source code in qqmusic_api/core/request.py
def paginate(self, limit: int | None = None) -> ResponsePager[RequestResultT]:
    """返回响应的分页迭代器.

    Args:
        limit: 最大获取页数.
    """
    return ResponsePager(self, limit=limit)

RefreshableRequest dataclass

RefreshableRequest(
    refresh_meta: RefreshMeta,
    *,
    _client: Client,
    module: str,
    method: str,
    param: dict[str, Any] | dict[int, Any],
    response_model: type[BaseModel] | None = None,
    comm: dict[str, int | str | bool] | None = None,
    is_jce: bool = False,
    preserve_bool: bool = False,
    credential: Credential | None = None,
    platform: Platform | None = None,
)

Bases: Request[RequestResultT]

声明了换一批能力的请求描述符.

get_refresh_meta

get_refresh_meta() -> RefreshMeta

返回换一批元数据.

Source code in qqmusic_api/core/request.py
def get_refresh_meta(self) -> RefreshMeta:
    """返回换一批元数据."""
    return self.refresh_meta

refresh

refresh() -> ResponseRefresher[RequestResultT]

返回响应的换一批控制器.

Source code in qqmusic_api/core/request.py
def refresh(self) -> ResponseRefresher[RequestResultT]:
    """返回响应的换一批控制器."""
    return ResponseRefresher(self)

RequestGroupResult dataclass

RequestGroupResult(
    index: int,
    module: str,
    method: str,
    success: bool,
    data: RequestResult | None = None,
    error: Exception | None = None,
)

批量请求中的单条结果.

RequestGroup dataclass

RequestGroup(
    _client: Client,
    batch_size: int = 20,
    max_inflight_batches: int = 5,
    _requests: list[Request[Any]] = list(),
    _grouped_requests: dict[
        BaseGroupKey, list[tuple[int, Request[Any]]]
    ] = dict(),
)

批量请求容器.

会按请求的 platformcredentialcommis_jce 自动分组,
并按 batch_size 自动分批发送.

add

add(request: Request[Any]) -> RequestGroup

添加请求.

PARAMETER DESCRIPTION
request

待执行的请求描述符.

TYPE: Request[Any]

RETURNS DESCRIPTION
RequestGroup

当前 RequestGroup, 用于链式调用.

Source code in qqmusic_api/core/request.py
def add(self, request: Request[Any]) -> "RequestGroup":
    """添加请求.

    Args:
        request: 待执行的请求描述符.

    Returns:
        当前 RequestGroup, 用于链式调用.
    """
    index = len(self._requests)
    self._requests.append(request)
    group_key = self._group_key(request)
    self._grouped_requests.setdefault(group_key, []).append((index, request))
    return self

extend

extend(requests: list[Request[Any]]) -> RequestGroup

批量添加请求.

PARAMETER DESCRIPTION
requests

待执行请求列表.

TYPE: list[Request[Any]]

RETURNS DESCRIPTION
RequestGroup

当前 RequestGroup, 用于链式调用.

Source code in qqmusic_api/core/request.py
def extend(self, requests: list[Request[Any]]) -> "RequestGroup":
    """批量添加请求.

    Args:
        requests: 待执行请求列表.

    Returns:
        当前 RequestGroup, 用于链式调用.
    """
    for request in requests:
        self.add(request)
    return self

execute async

execute() -> list[RequestResult | Exception]

执行当前批量请求并返回混合结果列表.

RETURNS DESCRIPTION
list[RequestResult | Exception]

list[RequestResult | Exception]: 与请求添加顺序一致的结果列表.

list[RequestResult | Exception]

成功项为响应数据, 失败项为异常对象.

Source code in qqmusic_api/core/request.py
async def execute(self) -> list[RequestResult | Exception]:
    """执行当前批量请求并返回混合结果列表.

    Returns:
        list[RequestResult | Exception]: 与请求添加顺序一致的结果列表.
        成功项为响应数据, 失败项为异常对象.
    """
    if not self._requests:
        return []

    results: list[RequestResult | Exception | None] = [None] * len(self._requests)
    async for result in self.execute_iter():
        if result.success:
            results[result.index] = result.data
        else:
            if result.error is None:
                raise ApiError(
                    "批量请求失败结果缺少异常对象",
                    code=-1,
                    data={"index": result.index, "module": result.module, "method": result.method},
                )
            results[result.index] = result.error

    finalized: list[RequestResult | Exception] = []
    for index, item in enumerate(results):
        if item is None:
            request = self._requests[index]
            raise RequestGroupResultMissingError(
                "批量请求结果存在未填充项",
                context={"index": index, "module": request.module, "method": request.method},
            )
        finalized.append(item)
    return finalized

execute_iter async

execute_iter() -> AsyncIterator[RequestGroupResult]

执行当前批量请求并按完成先后流式返回结果.

Source code in qqmusic_api/core/request.py
async def execute_iter(self) -> AsyncIterator[RequestGroupResult]:
    """执行当前批量请求并按完成先后流式返回结果."""
    if not self._requests:
        return

    send_stream, receive_stream = anyio.create_memory_object_stream[RequestGroupResult](len(self._requests))
    limiter = anyio.CapacityLimiter(self.max_inflight_batches)

    async with receive_stream, anyio.create_task_group() as task_group:
        async with send_stream:
            for batch_slice in self._iter_batches(self._grouped_requests):
                task_group.start_soon(self._stream_batch_results, batch_slice, limiter, send_stream.clone())
            await send_stream.aclose()

        async for result in receive_stream:
            yield result