vendor_fabric.base¶
Base class for all vendor fabric.
This module provides ConnectorBase - the foundation for ALL connectors in the package connector fabric. It extends InputProvider and provides:
Credential loading from env vars, stdin, or direct inputs
HTTP client with retries and rate limiting
Provider capability metadata and dispatch hooks
ALL connectors should extend this class instead of InputProvider directly.
Usage: from extended_data import ExtendedDict from vendor_fabric import ConnectorBase
class MyConnector(ConnectorBase):
API_KEY_ENV = "MY_API_KEY" # Required env var name
BASE_URL = "https://api.example.com"
def __init__(self, api_key: str | None = None, **kwargs):
super().__init__(**kwargs)
self._api_key = api_key or self.get_input(self.API_KEY_ENV, required=True)
def my_operation(self) -> ExtendedDict:
return self.request_data("GET", "/endpoint", suffix="json")
Module Contents¶
Classes¶
Base class for all vendor fabric. |
API¶
- exception vendor_fabric.base.RateLimitError¶
Bases:
ExceptionRaised when API rate limit is hit - triggers retry.
Initialization
Initialize self. See help(type(self)) for accurate signature.
- add_note()¶
- class args¶
- with_traceback()¶
- exception vendor_fabric.base.ConnectorAPIError(message: str, status_code: int | None = None)¶
Bases:
ExceptionRaised when API returns an error.
Initialization
Initialize self. See help(type(self)) for accurate signature.
- add_note()¶
- class args¶
- with_traceback()¶
- class vendor_fabric.base.ConnectorBase(api_key: str | None = None, base_url: str | None = None, timeout: float | None = None, logger: extended_data.logging.Logging | None = None, **kwargs: Any)¶
Bases:
vendor_fabric.capabilities.CapabilityProviderMixin,extended_data.inputs.InputProvider,abc.ABCBase class for all vendor fabric.
Provides:
InputProvider for credential loading (env, stdin, direct)
HTTP client with connection pooling
Automatic retries with exponential backoff
Rate limiting
Provider capability registration scaffolding
Class Attributes: BASE_URL: API base URL (required for HTTP connectors) API_KEY_ENV: Environment variable name for API key CONNECTOR_CATEGORY: Catalog category for registry metadata CONNECTOR_CAPABILITIES: Catalog capabilities for registry metadata TIMEOUT: HTTP timeout in seconds (default 300) MIN_REQUEST_INTERVAL: Minimum seconds between requests (rate limiting) MAX_RETRIES: Maximum retry attempts (default 5)
Instance Attributes: logger: Logger instance _client: HTTP client (lazy-initialized)
Initialization
Initialize the connector.
Args: api_key: API key (overrides environment variable) base_url: Base URL (overrides class default) timeout: HTTP timeout in seconds logger: Logger instance **kwargs: Passed to InputProvider
- BASE_URL: ClassVar[str] = <Multiline-String>¶
- API_KEY_ENV: ClassVar[str] = <Multiline-String>¶
- CONNECTOR_CATEGORY: ClassVar[str] = 'external'¶
- CONNECTOR_CAPABILITIES: ClassVar[tuple[str, ...]] = ()¶
- TIMEOUT: ClassVar[float] = 300.0¶
- MIN_REQUEST_INTERVAL: ClassVar[float] = 0.0¶
- MAX_RETRIES: ClassVar[int] = 5¶
- property api_key: str¶
Get API key, raising if not set.
- property client: httpx.Client¶
Get or create HTTP client with connection pooling.
- close() None¶
Close HTTP client and release resources.
- request(method: str, endpoint: str, *, headers: dict[str, str] | None = None, **kwargs: Any) httpx.Response¶
Make HTTP request with retries and rate limiting.
Args: method: HTTP method (GET, POST, PUT, DELETE, etc.) endpoint: API endpoint (relative to BASE_URL) headers: Additional headers (merged with defaults) **kwargs: Passed to httpx.request (json, params, data, etc.)
Returns: httpx.Response
Raises: RateLimitError: On 429 or 5xx responses after retries are exhausted. ConnectorAPIError: On other API errors.
- decode_response(response: httpx.Response, *, suffix: str | None = None, as_extended: bool = True) Any¶
Decode an HTTP response body through the extended-data IO layer.
Structured response bodies are decoded from JSON, YAML, TOML, or HCL and promoted through the ExtendedData root by default. Text responses become raw strings, and unknown binary responses remain bytes.
- decode_response_file(response: httpx.Response, *, source: str | None = None, suffix: str | None = None, as_extended: bool = True, metadata: collections.abc.Mapping[str, Any] | None = None) extended_data.io.DataFile¶
Decode an HTTP response body into a DataFile artifact with provenance.
- extend_result(value: Any) Any¶
Promote connector data payloads through the ExtendedData root.
- request_data(method: str, endpoint: str, *, headers: dict[str, str] | None = None, suffix: str | None = None, as_extended: bool = True, **kwargs: Any) Any¶
Make an HTTP request and return decoded response data.
- request_data_file(method: str, endpoint: str, *, headers: dict[str, str] | None = None, suffix: str | None = None, as_extended: bool = True, **kwargs: Any) extended_data.io.DataFile¶
Make an HTTP request and return a decoded DataFile response artifact.
- request_workflow(method: str, endpoint: str, *, headers: dict[str, str] | None = None, suffix: str | None = None, as_extended: bool = True, **kwargs: Any) extended_data.workflows.DataWorkflow¶
Make an HTTP request and return a workflow over the decoded response artifact.
- get(endpoint: str, **kwargs: Any) httpx.Response¶
HTTP GET request.
- get_data(endpoint: str, *, suffix: str | None = None, as_extended: bool = True, **kwargs: Any) Any¶
HTTP GET request returning decoded response data.
- get_workflow(endpoint: str, *, suffix: str | None = None, as_extended: bool = True, **kwargs: Any) extended_data.workflows.DataWorkflow¶
HTTP GET request returning a workflow over decoded response data.
- post(endpoint: str, **kwargs: Any) httpx.Response¶
HTTP POST request.
- post_data(endpoint: str, *, suffix: str | None = None, as_extended: bool = True, **kwargs: Any) Any¶
HTTP POST request returning decoded response data.
- post_workflow(endpoint: str, *, suffix: str | None = None, as_extended: bool = True, **kwargs: Any) extended_data.workflows.DataWorkflow¶
HTTP POST request returning a workflow over decoded response data.
- put(endpoint: str, **kwargs: Any) httpx.Response¶
HTTP PUT request.
- put_data(endpoint: str, *, suffix: str | None = None, as_extended: bool = True, **kwargs: Any) Any¶
HTTP PUT request returning decoded response data.
- put_workflow(endpoint: str, *, suffix: str | None = None, as_extended: bool = True, **kwargs: Any) extended_data.workflows.DataWorkflow¶
HTTP PUT request returning a workflow over decoded response data.
- delete(endpoint: str, **kwargs: Any) httpx.Response¶
HTTP DELETE request.
- delete_data(endpoint: str, *, suffix: str | None = None, as_extended: bool = True, **kwargs: Any) Any¶
HTTP DELETE request returning decoded response data.
- delete_workflow(endpoint: str, *, suffix: str | None = None, as_extended: bool = True, **kwargs: Any) extended_data.workflows.DataWorkflow¶
HTTP DELETE request returning a workflow over decoded response data.
- patch(endpoint: str, **kwargs: Any) httpx.Response¶
HTTP PATCH request.
- patch_data(endpoint: str, *, suffix: str | None = None, as_extended: bool = True, **kwargs: Any) Any¶
HTTP PATCH request returning decoded response data.
- patch_workflow(endpoint: str, *, suffix: str | None = None, as_extended: bool = True, **kwargs: Any) extended_data.workflows.DataWorkflow¶
HTTP PATCH request returning a workflow over decoded response data.
- download(url: str, output_path: str) int¶
Download file from URL.
Args: url: URL to download from output_path: Local path to save to
Returns: File size in bytes
- vendor_capabilities: ClassVar[dict[str, vendor_fabric.capabilities.CapabilitySpec]] = None¶
- vendor_capability_methods: ClassVar[dict[str, str]] = None¶
- get_input(k: str, default: Any | None = None, required: bool = False, is_bool: bool = False, is_integer: bool = False, is_float: bool = False, is_path: bool = False, is_datetime: bool = False, as_extended: bool = False) Any¶
- decode_input(k: str, default: Any | None = None, required: bool = False, decode_from_json: bool = False, decode_from_yaml: bool = False, decode_from_base64: bool = False, allow_none: bool = True, as_extended: bool = False) Any¶
- freeze_inputs() extended_data.containers.mappings.ExtendedDict¶
- thaw_inputs() extended_data.containers.mappings.ExtendedDict¶
- snapshot_inputs(*, frozen: bool = False) extended_data.containers.mappings.ExtendedDict¶
- replace_inputs(new_inputs: collections.abc.Mapping[str, Any] | None, *, clear_frozen: bool = True) extended_data.containers.mappings.ExtendedDict¶
- merge_inputs(new_inputs: collections.abc.Mapping[str, Any] | None) extended_data.containers.mappings.ExtendedDict¶
- shift_inputs() extended_data.containers.mappings.ExtendedDict¶