vendor_fabric.base

Base class for all vendor fabric.

This module provides ConnectorBase - the foundation for ALL connectors in the package connector fabric. It extends InputProvider and provides:

  1. Credential loading from env vars, stdin, or direct inputs

  2. HTTP client with retries and rate limiting

  3. Provider capability metadata and dispatch hooks

ALL connectors should extend this class instead of InputProvider directly.

Usage: from extended_data import ExtendedDict from vendor_fabric import ConnectorBase

class MyConnector(ConnectorBase):
    API_KEY_ENV = "MY_API_KEY"  # Required env var name
    BASE_URL = "https://api.example.com"

    def __init__(self, api_key: str | None = None, **kwargs):
        super().__init__(**kwargs)
        self._api_key = api_key or self.get_input(self.API_KEY_ENV, required=True)

    def my_operation(self) -> ExtendedDict:
        return self.request_data("GET", "/endpoint", suffix="json")

Module Contents

Classes

ConnectorBase

Base class for all vendor fabric.

API

exception vendor_fabric.base.RateLimitError

Bases: Exception

Raised when API rate limit is hit - triggers retry.

Initialization

Initialize self. See help(type(self)) for accurate signature.

add_note()
class args
with_traceback()
exception vendor_fabric.base.ConnectorAPIError(message: str, status_code: int | None = None)

Bases: Exception

Raised when API returns an error.

Initialization

Initialize self. See help(type(self)) for accurate signature.

add_note()
class args
with_traceback()
class vendor_fabric.base.ConnectorBase(api_key: str | None = None, base_url: str | None = None, timeout: float | None = None, logger: extended_data.logging.Logging | None = None, **kwargs: Any)

Bases: vendor_fabric.capabilities.CapabilityProviderMixin, extended_data.inputs.InputProvider, abc.ABC

Base class for all vendor fabric.

Provides:

  • InputProvider for credential loading (env, stdin, direct)

  • HTTP client with connection pooling

  • Automatic retries with exponential backoff

  • Rate limiting

  • Provider capability registration scaffolding

Class Attributes: BASE_URL: API base URL (required for HTTP connectors) API_KEY_ENV: Environment variable name for API key CONNECTOR_CATEGORY: Catalog category for registry metadata CONNECTOR_CAPABILITIES: Catalog capabilities for registry metadata TIMEOUT: HTTP timeout in seconds (default 300) MIN_REQUEST_INTERVAL: Minimum seconds between requests (rate limiting) MAX_RETRIES: Maximum retry attempts (default 5)

Instance Attributes: logger: Logger instance _client: HTTP client (lazy-initialized)

Initialization

Initialize the connector.

Args: api_key: API key (overrides environment variable) base_url: Base URL (overrides class default) timeout: HTTP timeout in seconds logger: Logger instance **kwargs: Passed to InputProvider

BASE_URL: ClassVar[str] = <Multiline-String>
API_KEY_ENV: ClassVar[str] = <Multiline-String>
CONNECTOR_CATEGORY: ClassVar[str] = 'external'
CONNECTOR_CAPABILITIES: ClassVar[tuple[str, ...]] = ()
TIMEOUT: ClassVar[float] = 300.0
MIN_REQUEST_INTERVAL: ClassVar[float] = 0.0
MAX_RETRIES: ClassVar[int] = 5
property api_key: str

Get API key, raising if not set.

property client: httpx.Client

Get or create HTTP client with connection pooling.

close() None

Close HTTP client and release resources.

request(method: str, endpoint: str, *, headers: dict[str, str] | None = None, **kwargs: Any) httpx.Response

Make HTTP request with retries and rate limiting.

Args: method: HTTP method (GET, POST, PUT, DELETE, etc.) endpoint: API endpoint (relative to BASE_URL) headers: Additional headers (merged with defaults) **kwargs: Passed to httpx.request (json, params, data, etc.)

Returns: httpx.Response

Raises: RateLimitError: On 429 or 5xx responses after retries are exhausted. ConnectorAPIError: On other API errors.

decode_response(response: httpx.Response, *, suffix: str | None = None, as_extended: bool = True) Any

Decode an HTTP response body through the extended-data IO layer.

Structured response bodies are decoded from JSON, YAML, TOML, or HCL and promoted through the ExtendedData root by default. Text responses become raw strings, and unknown binary responses remain bytes.

decode_response_file(response: httpx.Response, *, source: str | None = None, suffix: str | None = None, as_extended: bool = True, metadata: collections.abc.Mapping[str, Any] | None = None) extended_data.io.DataFile

Decode an HTTP response body into a DataFile artifact with provenance.

extend_result(value: Any) Any

Promote connector data payloads through the ExtendedData root.

request_data(method: str, endpoint: str, *, headers: dict[str, str] | None = None, suffix: str | None = None, as_extended: bool = True, **kwargs: Any) Any

Make an HTTP request and return decoded response data.

request_data_file(method: str, endpoint: str, *, headers: dict[str, str] | None = None, suffix: str | None = None, as_extended: bool = True, **kwargs: Any) extended_data.io.DataFile

Make an HTTP request and return a decoded DataFile response artifact.

request_workflow(method: str, endpoint: str, *, headers: dict[str, str] | None = None, suffix: str | None = None, as_extended: bool = True, **kwargs: Any) extended_data.workflows.DataWorkflow

Make an HTTP request and return a workflow over the decoded response artifact.

get(endpoint: str, **kwargs: Any) httpx.Response

HTTP GET request.

get_data(endpoint: str, *, suffix: str | None = None, as_extended: bool = True, **kwargs: Any) Any

HTTP GET request returning decoded response data.

get_workflow(endpoint: str, *, suffix: str | None = None, as_extended: bool = True, **kwargs: Any) extended_data.workflows.DataWorkflow

HTTP GET request returning a workflow over decoded response data.

post(endpoint: str, **kwargs: Any) httpx.Response

HTTP POST request.

post_data(endpoint: str, *, suffix: str | None = None, as_extended: bool = True, **kwargs: Any) Any

HTTP POST request returning decoded response data.

post_workflow(endpoint: str, *, suffix: str | None = None, as_extended: bool = True, **kwargs: Any) extended_data.workflows.DataWorkflow

HTTP POST request returning a workflow over decoded response data.

put(endpoint: str, **kwargs: Any) httpx.Response

HTTP PUT request.

put_data(endpoint: str, *, suffix: str | None = None, as_extended: bool = True, **kwargs: Any) Any

HTTP PUT request returning decoded response data.

put_workflow(endpoint: str, *, suffix: str | None = None, as_extended: bool = True, **kwargs: Any) extended_data.workflows.DataWorkflow

HTTP PUT request returning a workflow over decoded response data.

delete(endpoint: str, **kwargs: Any) httpx.Response

HTTP DELETE request.

delete_data(endpoint: str, *, suffix: str | None = None, as_extended: bool = True, **kwargs: Any) Any

HTTP DELETE request returning decoded response data.

delete_workflow(endpoint: str, *, suffix: str | None = None, as_extended: bool = True, **kwargs: Any) extended_data.workflows.DataWorkflow

HTTP DELETE request returning a workflow over decoded response data.

patch(endpoint: str, **kwargs: Any) httpx.Response

HTTP PATCH request.

patch_data(endpoint: str, *, suffix: str | None = None, as_extended: bool = True, **kwargs: Any) Any

HTTP PATCH request returning decoded response data.

patch_workflow(endpoint: str, *, suffix: str | None = None, as_extended: bool = True, **kwargs: Any) extended_data.workflows.DataWorkflow

HTTP PATCH request returning a workflow over decoded response data.

download(url: str, output_path: str) int

Download file from URL.

Args: url: URL to download from output_path: Local path to save to

Returns: File size in bytes

vendor_capabilities: ClassVar[dict[str, vendor_fabric.capabilities.CapabilitySpec]] = None
vendor_capability_methods: ClassVar[dict[str, str]] = None
get_input(k: str, default: Any | None = None, required: bool = False, is_bool: bool = False, is_integer: bool = False, is_float: bool = False, is_path: bool = False, is_datetime: bool = False, as_extended: bool = False) Any
decode_input(k: str, default: Any | None = None, required: bool = False, decode_from_json: bool = False, decode_from_yaml: bool = False, decode_from_base64: bool = False, allow_none: bool = True, as_extended: bool = False) Any
freeze_inputs() extended_data.containers.mappings.ExtendedDict
thaw_inputs() extended_data.containers.mappings.ExtendedDict
snapshot_inputs(*, frozen: bool = False) extended_data.containers.mappings.ExtendedDict
replace_inputs(new_inputs: collections.abc.Mapping[str, Any] | None, *, clear_frozen: bool = True) extended_data.containers.mappings.ExtendedDict
merge_inputs(new_inputs: collections.abc.Mapping[str, Any] | None) extended_data.containers.mappings.ExtendedDict
shift_inputs() extended_data.containers.mappings.ExtendedDict