Files
odoo-stubs/odoo-stubs/sql_db.pyi
Trinh Anh Ngoc 558e8e6db5 Cleanup
2023-05-19 18:22:39 +07:00

104 lines
3.4 KiB
Python

from re import Pattern
from threading import RLock
from typing import Any, Callable, Generator, Iterable, Iterator, NoReturn, TypeVar
import psycopg2.extensions
from decorator import decorator
from .tools import Callbacks
_T = TypeVar("_T")
_CursorT = TypeVar("_CursorT", bound=Cursor)
def unbuffer(symb, cr: BaseCursor) -> str | None: ...
def undecimalize(symb, cr: BaseCursor) -> float | None: ...
def adapt_string(adapted: str) -> psycopg2.extensions.QuotedString: ...
def flush_env(cr: BaseCursor, *, clear: bool = ...) -> None: ...
def clear_env(cr: BaseCursor) -> None: ...
re_from: Pattern
re_into: Pattern
sql_counter: int
@decorator
def check(f: Callable[..., _T], self: Cursor, *args, **kwargs) -> _T: ...
class BaseCursor:
precommit: Callbacks
postcommit: Callbacks
prerollback: Callbacks
postrollback: Callbacks
def __init__(self) -> None: ...
def execute(self, query, *args, **kwargs): ...
def commit(self): ...
def rollback(self): ...
def close(self): ...
def savepoint(self, flush: bool = ...) -> Generator[None, None, None]: ...
def __enter__(self: _CursorT) -> _CursorT: ...
def __exit__(self, exc_type, exc_value, traceback) -> None: ...
class Cursor(BaseCursor):
IN_MAX: int
sql_from_log: dict
sql_into_log: dict
sql_log: bool
sql_log_count: int
dbname: str
cache: dict
def __init__(
self, pool: ConnectionPool, dbname: str, dsn: dict, serialized: bool = ...
) -> None: ...
def dictfetchone(self) -> dict[str, Any] | None: ...
def dictfetchmany(self, size) -> list[dict[str, Any]]: ...
def dictfetchall(self) -> list[dict[str, Any]]: ...
def __del__(self) -> None: ...
def execute(
self, query, params: Any | None = ..., log_exceptions: Any | None = ...
): ...
def split_for_in_conditions(
self, ids: Iterable, size: int | None = ...
) -> Iterator[tuple]: ...
def print_log(self): ...
def close(self): ...
def autocommit(self, on: bool) -> None: ...
def after(self, event: str, func: Callable) -> None: ...
def commit(self): ...
def rollback(self): ...
def __getattr__(self, name: str): ...
@property
def closed(self) -> bool: ...
class TestCursor(BaseCursor):
def __init__(self, cursor: Cursor, lock: RLock) -> None: ...
def close(self) -> None: ...
def autocommit(self, on: bool) -> None: ...
def commit(self) -> None: ...
def rollback(self) -> None: ...
def __getattr__(self, name: str): ...
class PsycoConnection(psycopg2.extensions.connection): ...
class ConnectionPool:
def locked(fun: Callable[..., _T]) -> Callable[..., _T]: ...
def __init__(self, maxconn: int = ...) -> None: ...
def borrow(self, connection_info: dict) -> PsycoConnection: ...
def give_back(
self, connection: PsycoConnection, keep_in_pool: bool = ...
) -> None: ...
def close_all(self, dsn: dict | None = ...) -> None: ...
class Connection:
def __init__(self, pool: ConnectionPool, dbname: str, dsn: dict) -> None: ...
@property
def dsn(self) -> dict: ...
@property
def dbname(self) -> str: ...
def cursor(self, serialized: bool = ...) -> Cursor: ...
serialized_cursor = cursor
def __bool__(self) -> NoReturn: ...
def connection_info_for(db_or_uri: str) -> tuple[str, dict]: ...
def db_connect(to: str, allow_uri: bool = ...) -> Connection: ...
def close_db(db_name: str) -> None: ...
def close_all() -> None: ...