:py:mod:`vendor_fabric.vault` ============================= .. py:module:: vendor_fabric.vault .. autodoc2-docstring:: vendor_fabric.vault :parser: myst :allowtitles: Submodules ---------- .. toctree:: :titlesonly: :maxdepth: 1 vendor_fabric.vault.tools Package Contents ---------------- Classes ~~~~~~~ .. list-table:: :class: autosummary longtable :align: left * - :py:obj:`VaultConnector ` - .. autodoc2-docstring:: vendor_fabric.vault.VaultConnector :parser: myst :summary: Data ~~~~ .. list-table:: :class: autosummary longtable :align: left * - :py:obj:`VAULT_URL_ENV_VAR ` - .. autodoc2-docstring:: vendor_fabric.vault.VAULT_URL_ENV_VAR :parser: myst :summary: * - :py:obj:`VAULT_NAMESPACE_ENV_VAR ` - .. autodoc2-docstring:: vendor_fabric.vault.VAULT_NAMESPACE_ENV_VAR :parser: myst :summary: * - :py:obj:`VAULT_ROLE_ID_ENV_VAR ` - .. autodoc2-docstring:: vendor_fabric.vault.VAULT_ROLE_ID_ENV_VAR :parser: myst :summary: * - :py:obj:`VAULT_SECRET_ID_ENV_VAR ` - .. autodoc2-docstring:: vendor_fabric.vault.VAULT_SECRET_ID_ENV_VAR :parser: myst :summary: * - :py:obj:`VAULT_APPROLE_PATH_ENV_VAR ` - .. autodoc2-docstring:: vendor_fabric.vault.VAULT_APPROLE_PATH_ENV_VAR :parser: myst :summary: API ~~~ .. py:data:: VAULT_URL_ENV_VAR :canonical: vendor_fabric.vault.VAULT_URL_ENV_VAR :value: 'VAULT_ADDR' .. autodoc2-docstring:: vendor_fabric.vault.VAULT_URL_ENV_VAR :parser: myst .. py:data:: VAULT_NAMESPACE_ENV_VAR :canonical: vendor_fabric.vault.VAULT_NAMESPACE_ENV_VAR :value: 'VAULT_NAMESPACE' .. autodoc2-docstring:: vendor_fabric.vault.VAULT_NAMESPACE_ENV_VAR :parser: myst .. py:data:: VAULT_ROLE_ID_ENV_VAR :canonical: vendor_fabric.vault.VAULT_ROLE_ID_ENV_VAR :value: 'VAULT_ROLE_ID' .. autodoc2-docstring:: vendor_fabric.vault.VAULT_ROLE_ID_ENV_VAR :parser: myst .. py:data:: VAULT_SECRET_ID_ENV_VAR :canonical: vendor_fabric.vault.VAULT_SECRET_ID_ENV_VAR :value: 'VAULT_SECRET_ID' .. autodoc2-docstring:: vendor_fabric.vault.VAULT_SECRET_ID_ENV_VAR :parser: myst .. py:data:: VAULT_APPROLE_PATH_ENV_VAR :canonical: vendor_fabric.vault.VAULT_APPROLE_PATH_ENV_VAR :value: 'VAULT_APPROLE_PATH' .. autodoc2-docstring:: vendor_fabric.vault.VAULT_APPROLE_PATH_ENV_VAR :parser: myst .. py:class:: VaultConnector(vault_url: str | None = None, vault_namespace: str | None = None, vault_token: str | None = None, logger: extended_data.logging.Logging | None = None, **kwargs: typing.Any) :canonical: vendor_fabric.vault.VaultConnector Bases: :py:obj:`vendor_fabric.base.ConnectorBase` .. autodoc2-docstring:: vendor_fabric.vault.VaultConnector :parser: myst .. rubric:: Initialization .. autodoc2-docstring:: vendor_fabric.vault.VaultConnector.__init__ :parser: myst .. py:property:: vault_client :canonical: vendor_fabric.vault.VaultConnector.vault_client :type: hvac.Client .. autodoc2-docstring:: vendor_fabric.vault.VaultConnector.vault_client :parser: myst .. py:method:: get_vault_client(vault_url: str | None = None, vault_namespace: str | None = None, vault_token: str | None = None, **kwargs: typing.Any) -> hvac.Client :canonical: vendor_fabric.vault.VaultConnector.get_vault_client :classmethod: .. autodoc2-docstring:: vendor_fabric.vault.VaultConnector.get_vault_client :parser: myst .. py:method:: list_secrets(root_path: str = '/', mount_point: str = 'secret', max_depth: int | None = None) -> extended_data.containers.ExtendedDict :canonical: vendor_fabric.vault.VaultConnector.list_secrets .. autodoc2-docstring:: vendor_fabric.vault.VaultConnector.list_secrets :parser: myst .. py:method:: read_secret(path: str, mount_point: str = 'secret') -> extended_data.containers.ExtendedDict | None :canonical: vendor_fabric.vault.VaultConnector.read_secret .. autodoc2-docstring:: vendor_fabric.vault.VaultConnector.read_secret :parser: myst .. py:method:: get_secret(path: str = '/', secret_name: str | None = None, matchers: dict[str, str] | None = None, mount_point: str = 'secret') -> extended_data.containers.ExtendedDict | None :canonical: vendor_fabric.vault.VaultConnector.get_secret .. autodoc2-docstring:: vendor_fabric.vault.VaultConnector.get_secret :parser: myst .. py:method:: write_secret(path: str, data: dict[str, typing.Any], mount_point: str = 'secret') -> bool :canonical: vendor_fabric.vault.VaultConnector.write_secret .. autodoc2-docstring:: vendor_fabric.vault.VaultConnector.write_secret :parser: myst .. py:method:: list_aws_iam_roles(mount_point: str = 'aws', prefix: str | None = None) -> extended_data.containers.ExtendedList[extended_data.containers.ExtendedString] :canonical: vendor_fabric.vault.VaultConnector.list_aws_iam_roles .. autodoc2-docstring:: vendor_fabric.vault.VaultConnector.list_aws_iam_roles :parser: myst .. py:method:: get_aws_iam_role(role_name: str, mount_point: str = 'aws') -> extended_data.containers.ExtendedDict | None :canonical: vendor_fabric.vault.VaultConnector.get_aws_iam_role .. autodoc2-docstring:: vendor_fabric.vault.VaultConnector.get_aws_iam_role :parser: myst .. py:method:: generate_aws_credentials(role_name: str, mount_point: str = 'aws', ttl: str | None = None, credential_type: str | None = None) -> extended_data.containers.ExtendedDict :canonical: vendor_fabric.vault.VaultConnector.generate_aws_credentials .. autodoc2-docstring:: vendor_fabric.vault.VaultConnector.generate_aws_credentials :parser: myst .. py:attribute:: BASE_URL :canonical: vendor_fabric.vault.VaultConnector.BASE_URL :type: typing.ClassVar[str] :value: .. py:attribute:: API_KEY_ENV :canonical: vendor_fabric.vault.VaultConnector.API_KEY_ENV :type: typing.ClassVar[str] :value: .. py:attribute:: CONNECTOR_CATEGORY :canonical: vendor_fabric.vault.VaultConnector.CONNECTOR_CATEGORY :type: typing.ClassVar[str] :value: 'external' .. py:attribute:: CONNECTOR_CAPABILITIES :canonical: vendor_fabric.vault.VaultConnector.CONNECTOR_CAPABILITIES :type: typing.ClassVar[tuple[str, ...]] :value: () .. py:attribute:: TIMEOUT :canonical: vendor_fabric.vault.VaultConnector.TIMEOUT :type: typing.ClassVar[float] :value: 300.0 .. py:attribute:: MIN_REQUEST_INTERVAL :canonical: vendor_fabric.vault.VaultConnector.MIN_REQUEST_INTERVAL :type: typing.ClassVar[float] :value: 0.0 .. py:attribute:: MAX_RETRIES :canonical: vendor_fabric.vault.VaultConnector.MAX_RETRIES :type: typing.ClassVar[int] :value: 5 .. py:property:: api_key :canonical: vendor_fabric.vault.VaultConnector.api_key :type: str .. py:property:: client :canonical: vendor_fabric.vault.VaultConnector.client :type: httpx.Client .. py:method:: close() -> None :canonical: vendor_fabric.vault.VaultConnector.close .. py:method:: request(method: str, endpoint: str, *, headers: dict[str, str] | None = None, **kwargs: typing.Any) -> httpx.Response :canonical: vendor_fabric.vault.VaultConnector.request .. py:method:: decode_response(response: httpx.Response, *, suffix: str | None = None, as_extended: bool = True) -> typing.Any :canonical: vendor_fabric.vault.VaultConnector.decode_response .. py:method:: decode_response_file(response: httpx.Response, *, source: str | None = None, suffix: str | None = None, as_extended: bool = True, metadata: collections.abc.Mapping[str, typing.Any] | None = None) -> extended_data.io.DataFile :canonical: vendor_fabric.vault.VaultConnector.decode_response_file .. py:method:: extend_result(value: typing.Any) -> typing.Any :canonical: vendor_fabric.vault.VaultConnector.extend_result .. py:method:: request_data(method: str, endpoint: str, *, headers: dict[str, str] | None = None, suffix: str | None = None, as_extended: bool = True, **kwargs: typing.Any) -> typing.Any :canonical: vendor_fabric.vault.VaultConnector.request_data .. py:method:: request_data_file(method: str, endpoint: str, *, headers: dict[str, str] | None = None, suffix: str | None = None, as_extended: bool = True, **kwargs: typing.Any) -> extended_data.io.DataFile :canonical: vendor_fabric.vault.VaultConnector.request_data_file .. py:method:: request_workflow(method: str, endpoint: str, *, headers: dict[str, str] | None = None, suffix: str | None = None, as_extended: bool = True, **kwargs: typing.Any) -> extended_data.workflows.DataWorkflow :canonical: vendor_fabric.vault.VaultConnector.request_workflow .. py:method:: get(endpoint: str, **kwargs: typing.Any) -> httpx.Response :canonical: vendor_fabric.vault.VaultConnector.get .. py:method:: get_data(endpoint: str, *, suffix: str | None = None, as_extended: bool = True, **kwargs: typing.Any) -> typing.Any :canonical: vendor_fabric.vault.VaultConnector.get_data .. py:method:: get_workflow(endpoint: str, *, suffix: str | None = None, as_extended: bool = True, **kwargs: typing.Any) -> extended_data.workflows.DataWorkflow :canonical: vendor_fabric.vault.VaultConnector.get_workflow .. py:method:: post(endpoint: str, **kwargs: typing.Any) -> httpx.Response :canonical: vendor_fabric.vault.VaultConnector.post .. py:method:: post_data(endpoint: str, *, suffix: str | None = None, as_extended: bool = True, **kwargs: typing.Any) -> typing.Any :canonical: vendor_fabric.vault.VaultConnector.post_data .. py:method:: post_workflow(endpoint: str, *, suffix: str | None = None, as_extended: bool = True, **kwargs: typing.Any) -> extended_data.workflows.DataWorkflow :canonical: vendor_fabric.vault.VaultConnector.post_workflow .. py:method:: put(endpoint: str, **kwargs: typing.Any) -> httpx.Response :canonical: vendor_fabric.vault.VaultConnector.put .. py:method:: put_data(endpoint: str, *, suffix: str | None = None, as_extended: bool = True, **kwargs: typing.Any) -> typing.Any :canonical: vendor_fabric.vault.VaultConnector.put_data .. py:method:: put_workflow(endpoint: str, *, suffix: str | None = None, as_extended: bool = True, **kwargs: typing.Any) -> extended_data.workflows.DataWorkflow :canonical: vendor_fabric.vault.VaultConnector.put_workflow .. py:method:: delete(endpoint: str, **kwargs: typing.Any) -> httpx.Response :canonical: vendor_fabric.vault.VaultConnector.delete .. py:method:: delete_data(endpoint: str, *, suffix: str | None = None, as_extended: bool = True, **kwargs: typing.Any) -> typing.Any :canonical: vendor_fabric.vault.VaultConnector.delete_data .. py:method:: delete_workflow(endpoint: str, *, suffix: str | None = None, as_extended: bool = True, **kwargs: typing.Any) -> extended_data.workflows.DataWorkflow :canonical: vendor_fabric.vault.VaultConnector.delete_workflow .. py:method:: patch(endpoint: str, **kwargs: typing.Any) -> httpx.Response :canonical: vendor_fabric.vault.VaultConnector.patch .. py:method:: patch_data(endpoint: str, *, suffix: str | None = None, as_extended: bool = True, **kwargs: typing.Any) -> typing.Any :canonical: vendor_fabric.vault.VaultConnector.patch_data .. py:method:: patch_workflow(endpoint: str, *, suffix: str | None = None, as_extended: bool = True, **kwargs: typing.Any) -> extended_data.workflows.DataWorkflow :canonical: vendor_fabric.vault.VaultConnector.patch_workflow .. py:method:: download(url: str, output_path: str) -> int :canonical: vendor_fabric.vault.VaultConnector.download .. py:attribute:: vendor_capabilities :canonical: vendor_fabric.vault.VaultConnector.vendor_capabilities :type: typing.ClassVar[dict[str, vendor_fabric.capabilities.CapabilitySpec]] :value: None .. py:attribute:: vendor_capability_methods :canonical: vendor_fabric.vault.VaultConnector.vendor_capability_methods :type: typing.ClassVar[dict[str, str]] :value: None .. py:method:: get_input(k: str, default: typing.Any | None = None, required: bool = False, is_bool: bool = False, is_integer: bool = False, is_float: bool = False, is_path: bool = False, is_datetime: bool = False, as_extended: bool = False) -> typing.Any :canonical: vendor_fabric.vault.VaultConnector.get_input .. py:method:: decode_input(k: str, default: typing.Any | None = None, required: bool = False, decode_from_json: bool = False, decode_from_yaml: bool = False, decode_from_base64: bool = False, allow_none: bool = True, as_extended: bool = False) -> typing.Any :canonical: vendor_fabric.vault.VaultConnector.decode_input .. py:method:: freeze_inputs() -> extended_data.containers.mappings.ExtendedDict :canonical: vendor_fabric.vault.VaultConnector.freeze_inputs .. py:method:: thaw_inputs() -> extended_data.containers.mappings.ExtendedDict :canonical: vendor_fabric.vault.VaultConnector.thaw_inputs .. py:method:: snapshot_inputs(*, frozen: bool = False) -> extended_data.containers.mappings.ExtendedDict :canonical: vendor_fabric.vault.VaultConnector.snapshot_inputs .. py:method:: replace_inputs(new_inputs: collections.abc.Mapping[str, typing.Any] | None, *, clear_frozen: bool = True) -> extended_data.containers.mappings.ExtendedDict :canonical: vendor_fabric.vault.VaultConnector.replace_inputs .. py:method:: merge_inputs(new_inputs: collections.abc.Mapping[str, typing.Any] | None) -> extended_data.containers.mappings.ExtendedDict :canonical: vendor_fabric.vault.VaultConnector.merge_inputs .. py:method:: shift_inputs() -> extended_data.containers.mappings.ExtendedDict :canonical: vendor_fabric.vault.VaultConnector.shift_inputs