Module pachyderm_sdk.api.cdr.resolver

Handwritten classes/methods that augment the existing CDR API.

Expand source code
"""Handwritten classes/methods that augment the existing CDR API."""

import os
import gzip
from hashlib import blake2b
from hmac import compare_digest
from pathlib import Path
from typing import Optional
from urllib.parse import urlparse

from betterproto import which_one_of

from . import (
    Ref,
    Cipher,
    Compress,
    Concat,
    ContentHash,
    Http,
    SizeLimits,
    Slice,
    CipherAlgo,
    CompressAlgo,
    HashAlgo,
)

try:
    import requests
    from Crypto.Cipher import ChaCha20

    CDR_ENABLED = True
except ImportError:
    requests = None
    ChaCha20 = None
    CDR_ENABLED = False


class CdrResolver:
    """Class capable of resolving CDRs returned by the PFS API."""

    def __init__(
        self,
        *,
        cache_location: Optional[os.PathLike] = None,
        fetch_missing_chunks: bool = True,
        http_host_replacement: str = "",
    ):
        """Creates a CdrResolver.
        Whether CDR functionality is enabled is checked at time of initialization.

        Parameters
        ----------
        http_host_replacement : str
            The value of this parameter replaces the host (including port) within
            the presigned URLs when resolving CDRs. This configuration is useful
            if, for some reason, the URL that pachd uses to interact with object
            storage is different from the one that you use. For example, if your
            Pachyderm object storage is running within your kubernetes cluster and
            pachd is configured to use a URL which is only valid in cluster.
            This may fix any issues, depending on the object storage and how they
            generate presigned URLs.
            Example: localhost:9000
        """
        if not CDR_ENABLED:
            raise RuntimeError(
                f"CDR functionality is not enabled for this installation "
                f"of the pachyderm_sdk package. \n"
                f"To enable CDR functionality, reinstall package with: \n"
                f"  - pip install pachyderm_sdk[cdr]"
            )
        self.cache = cache_location
        if self.cache is not None:
            self.cache = Path(os.path.expanduser(self.cache)).resolve()
            self.cache.mkdir(parents=True, exist_ok=True)
        self.fetch_missing_chunks = fetch_missing_chunks
        self.http_host_replacement = http_host_replacement

    def resolve(self, ref: Ref) -> bytes:
        """Resolve a CDR reference."""
        field, body = which_one_of(ref, "body")
        if isinstance(body, Http):
            return self._dereference_http(body)
        elif isinstance(body, Cipher):
            return self._dereference_cipher(body)
        elif isinstance(body, Compress):
            return self._dereference_compress(body)
        elif isinstance(body, ContentHash):
            return self._dereference_content_hash(body)
        elif isinstance(body, SizeLimits):
            return self._dereference_size_limits(body)
        elif isinstance(body, Concat):
            return b"".join(map(self.resolve, body.refs))
        elif isinstance(body, Slice):
            return self.resolve(body.inner)[body.start : body.end]
        else:
            raise ValueError(f"unsupported Ref variant: {body}")

    def _dereference_http(self, body: Http) -> bytes:
        """Resolves an HTTP CDR. This means retrieving the data from object
        storage using a presigned URL.

        The HTTP CDR is the "bottom" of the CDR structure and therefore resolution
        does not recurse any deeper.

        If http_host_replacement was configured on the class, this substitution
        is made here. Additionally the Host header will be overwritten with the
        original host of the presigned URL.
        """
        url, headers = body.url, body.headers
        if self.http_host_replacement:
            parsed_url = urlparse(body.url)
            headers["Host"] = parsed_url.netloc
            url = parsed_url._replace(netloc=self.http_host_replacement).geturl()

        response = requests.get(url=url, headers=headers)
        try:
            response.raise_for_status()
        except requests.HTTPError as err:
            text = err.response.text
            if text:
                # If there's a response, log it in the error chain.
                raise requests.HTTPError(
                    f"Error {err.response.status_code} - HTTP response: {text}"
                ) from err
            raise err
        return response.content

    def _dereference_cipher(self, body: Cipher) -> bytes:
        """Resolves a Cipher CDR. This method must resolve its inner CDR."""
        if body.algo != CipherAlgo.CHACHA20:
            raise ValueError(f"unrecognized cipher algorithm: {body.algo}")
        inner = self.resolve(body.inner)
        cipher = ChaCha20.new(key=body.key, nonce=body.nonce)
        return cipher.decrypt(inner)

    def _dereference_compress(self, body: Compress) -> bytes:
        """Resolves a Compress CDR. This method must resolve its inner CDR."""
        if body.algo != CompressAlgo.GZIP:
            raise ValueError(f"unrecognized compress algorithm: {body.algo}")
        inner = self.resolve(body.inner)
        return gzip.decompress(inner)

    def _dereference_content_hash(self, body: ContentHash) -> bytes:
        """Resolves a ContentHash CDR. This method must resolve its inner CDR.

        If the cache_location has been set:
          * the content will be read from disk if the content exists within the cache,
            rather than resolving the HTTP reference.
          * the content will be saved to the cache after resolving the HTTP reference.
          * the cache uses the hex string of the content hash as its key (file name).
        """

        def _deref_inner() -> bytes:
            if body.algo != HashAlgo.BLAKE2b_256:
                raise ValueError(f"unrecognized hash algorithm: {body.algo}")
            inner = self.resolve(body.inner)
            inner_hash = blake2b(inner, digest_size=32).digest()
            if not compare_digest(inner_hash, body.hash):
                raise ValueError(
                    f"content failed hash check. HAVE: {inner_hash} WANT: {body.hash}"
                )
            return inner

        if not self.cache:
            return _deref_inner()

        chunk_file = self.cache.joinpath(self._chunk_name(body))
        if chunk_file.exists():
            return chunk_file.read_bytes()
        if not chunk_file.exists() and self.fetch_missing_chunks:
            content = _deref_inner()
            chunk_file.write_bytes(content)
            return content
        raise FileNotFoundError(f"chunk missing from cache: {chunk_file}")

    def _dereference_size_limits(self, body: SizeLimits) -> bytes:
        """Resolves a SizeLimits CDR. This method must resolve its inner CDR."""
        inner = self.resolve(body.inner)
        if body.min and len(inner) < body.min:
            raise ValueError(
                f"content failed minimum size check. "
                f"HAVE: {len(inner)} bytes "
                f"WANT: {body.min} bytes "
            )
        if body.max and len(inner) > body.max:
            raise ValueError(
                f"content failed minimum size check. "
                f"HAVE: {len(inner)} bytes "
                f"WANT: {body.max} bytes "
            )
        return inner

    @staticmethod
    def _chunk_name(content_hash: ContentHash) -> str:
        algorith = HashAlgo(content_hash.algo)
        return f"{algorith.name.lower()}_{content_hash.hash.hex()}"

Classes

class CdrResolver (*, cache_location: Optional[os.PathLike] = None, fetch_missing_chunks: bool = True, http_host_replacement: str = '')

Class capable of resolving CDRs returned by the PFS API.

Creates a CdrResolver. Whether CDR functionality is enabled is checked at time of initialization.

Parameters

http_host_replacement : str
The value of this parameter replaces the host (including port) within the presigned URLs when resolving CDRs. This configuration is useful if, for some reason, the URL that pachd uses to interact with object storage is different from the one that you use. For example, if your Pachyderm object storage is running within your kubernetes cluster and pachd is configured to use a URL which is only valid in cluster. This may fix any issues, depending on the object storage and how they generate presigned URLs. Example: localhost:9000
Expand source code
class CdrResolver:
    """Class capable of resolving CDRs returned by the PFS API."""

    def __init__(
        self,
        *,
        cache_location: Optional[os.PathLike] = None,
        fetch_missing_chunks: bool = True,
        http_host_replacement: str = "",
    ):
        """Creates a CdrResolver.
        Whether CDR functionality is enabled is checked at time of initialization.

        Parameters
        ----------
        http_host_replacement : str
            The value of this parameter replaces the host (including port) within
            the presigned URLs when resolving CDRs. This configuration is useful
            if, for some reason, the URL that pachd uses to interact with object
            storage is different from the one that you use. For example, if your
            Pachyderm object storage is running within your kubernetes cluster and
            pachd is configured to use a URL which is only valid in cluster.
            This may fix any issues, depending on the object storage and how they
            generate presigned URLs.
            Example: localhost:9000
        """
        if not CDR_ENABLED:
            raise RuntimeError(
                f"CDR functionality is not enabled for this installation "
                f"of the pachyderm_sdk package. \n"
                f"To enable CDR functionality, reinstall package with: \n"
                f"  - pip install pachyderm_sdk[cdr]"
            )
        self.cache = cache_location
        if self.cache is not None:
            self.cache = Path(os.path.expanduser(self.cache)).resolve()
            self.cache.mkdir(parents=True, exist_ok=True)
        self.fetch_missing_chunks = fetch_missing_chunks
        self.http_host_replacement = http_host_replacement

    def resolve(self, ref: Ref) -> bytes:
        """Resolve a CDR reference."""
        field, body = which_one_of(ref, "body")
        if isinstance(body, Http):
            return self._dereference_http(body)
        elif isinstance(body, Cipher):
            return self._dereference_cipher(body)
        elif isinstance(body, Compress):
            return self._dereference_compress(body)
        elif isinstance(body, ContentHash):
            return self._dereference_content_hash(body)
        elif isinstance(body, SizeLimits):
            return self._dereference_size_limits(body)
        elif isinstance(body, Concat):
            return b"".join(map(self.resolve, body.refs))
        elif isinstance(body, Slice):
            return self.resolve(body.inner)[body.start : body.end]
        else:
            raise ValueError(f"unsupported Ref variant: {body}")

    def _dereference_http(self, body: Http) -> bytes:
        """Resolves an HTTP CDR. This means retrieving the data from object
        storage using a presigned URL.

        The HTTP CDR is the "bottom" of the CDR structure and therefore resolution
        does not recurse any deeper.

        If http_host_replacement was configured on the class, this substitution
        is made here. Additionally the Host header will be overwritten with the
        original host of the presigned URL.
        """
        url, headers = body.url, body.headers
        if self.http_host_replacement:
            parsed_url = urlparse(body.url)
            headers["Host"] = parsed_url.netloc
            url = parsed_url._replace(netloc=self.http_host_replacement).geturl()

        response = requests.get(url=url, headers=headers)
        try:
            response.raise_for_status()
        except requests.HTTPError as err:
            text = err.response.text
            if text:
                # If there's a response, log it in the error chain.
                raise requests.HTTPError(
                    f"Error {err.response.status_code} - HTTP response: {text}"
                ) from err
            raise err
        return response.content

    def _dereference_cipher(self, body: Cipher) -> bytes:
        """Resolves a Cipher CDR. This method must resolve its inner CDR."""
        if body.algo != CipherAlgo.CHACHA20:
            raise ValueError(f"unrecognized cipher algorithm: {body.algo}")
        inner = self.resolve(body.inner)
        cipher = ChaCha20.new(key=body.key, nonce=body.nonce)
        return cipher.decrypt(inner)

    def _dereference_compress(self, body: Compress) -> bytes:
        """Resolves a Compress CDR. This method must resolve its inner CDR."""
        if body.algo != CompressAlgo.GZIP:
            raise ValueError(f"unrecognized compress algorithm: {body.algo}")
        inner = self.resolve(body.inner)
        return gzip.decompress(inner)

    def _dereference_content_hash(self, body: ContentHash) -> bytes:
        """Resolves a ContentHash CDR. This method must resolve its inner CDR.

        If the cache_location has been set:
          * the content will be read from disk if the content exists within the cache,
            rather than resolving the HTTP reference.
          * the content will be saved to the cache after resolving the HTTP reference.
          * the cache uses the hex string of the content hash as its key (file name).
        """

        def _deref_inner() -> bytes:
            if body.algo != HashAlgo.BLAKE2b_256:
                raise ValueError(f"unrecognized hash algorithm: {body.algo}")
            inner = self.resolve(body.inner)
            inner_hash = blake2b(inner, digest_size=32).digest()
            if not compare_digest(inner_hash, body.hash):
                raise ValueError(
                    f"content failed hash check. HAVE: {inner_hash} WANT: {body.hash}"
                )
            return inner

        if not self.cache:
            return _deref_inner()

        chunk_file = self.cache.joinpath(self._chunk_name(body))
        if chunk_file.exists():
            return chunk_file.read_bytes()
        if not chunk_file.exists() and self.fetch_missing_chunks:
            content = _deref_inner()
            chunk_file.write_bytes(content)
            return content
        raise FileNotFoundError(f"chunk missing from cache: {chunk_file}")

    def _dereference_size_limits(self, body: SizeLimits) -> bytes:
        """Resolves a SizeLimits CDR. This method must resolve its inner CDR."""
        inner = self.resolve(body.inner)
        if body.min and len(inner) < body.min:
            raise ValueError(
                f"content failed minimum size check. "
                f"HAVE: {len(inner)} bytes "
                f"WANT: {body.min} bytes "
            )
        if body.max and len(inner) > body.max:
            raise ValueError(
                f"content failed minimum size check. "
                f"HAVE: {len(inner)} bytes "
                f"WANT: {body.max} bytes "
            )
        return inner

    @staticmethod
    def _chunk_name(content_hash: ContentHash) -> str:
        algorith = HashAlgo(content_hash.algo)
        return f"{algorith.name.lower()}_{content_hash.hash.hex()}"

Methods

def resolve(self, ref: Ref) ‑> bytes

Resolve a CDR reference.

Expand source code
def resolve(self, ref: Ref) -> bytes:
    """Resolve a CDR reference."""
    field, body = which_one_of(ref, "body")
    if isinstance(body, Http):
        return self._dereference_http(body)
    elif isinstance(body, Cipher):
        return self._dereference_cipher(body)
    elif isinstance(body, Compress):
        return self._dereference_compress(body)
    elif isinstance(body, ContentHash):
        return self._dereference_content_hash(body)
    elif isinstance(body, SizeLimits):
        return self._dereference_size_limits(body)
    elif isinstance(body, Concat):
        return b"".join(map(self.resolve, body.refs))
    elif isinstance(body, Slice):
        return self.resolve(body.inner)[body.start : body.end]
    else:
        raise ValueError(f"unsupported Ref variant: {body}")