diff --git a/datatorch/__init__.py b/datatorch/__init__.py index 355a39e..ca0929b 100644 --- a/datatorch/__init__.py +++ b/datatorch/__init__.py @@ -1,10 +1,23 @@ import sys import json from typing import Any + from datatorch.api import ApiClient +from datatorch.artifacts import Artifact, Commit from datatorch.core import BASE_URL, BASE_URL_API -__all__ = ["ApiClient", "get_inputs", "BASE_URL", "BASE_URL_API"] +__all__ = [ + # Api + "BASE_URL", + "BASE_URL_API", + "ApiClient", + # Agent + "get_inputs", + # Artifact + "Artifact", + "Commit", + "CommitActive", +] _inputs = None diff --git a/datatorch/agent/pipelines/runner/docker.py b/datatorch/agent/pipelines/runner/docker.py index f63129c..c4c1227 100644 --- a/datatorch/agent/pipelines/runner/docker.py +++ b/datatorch/agent/pipelines/runner/docker.py @@ -4,10 +4,13 @@ from .runner import Runner +docker = Docker() + + class DockerRunner(Runner): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.docker = Docker() + self.docker = docker async def execute(self): container = await self.run_container() diff --git a/datatorch/artifacts/__init__.py b/datatorch/artifacts/__init__.py new file mode 100644 index 0000000..dfbcfa6 --- /dev/null +++ b/datatorch/artifacts/__init__.py @@ -0,0 +1,5 @@ +from .artifact import Artifact +from .commit import Commit +from .directory import ArtifactDirectory + +__all__ = ["Artifact", "Commit", "CommitActive", "ArtifactDirectory"] diff --git a/datatorch/artifacts/api.py b/datatorch/artifacts/api.py new file mode 100644 index 0000000..1b9309b --- /dev/null +++ b/datatorch/artifacts/api.py @@ -0,0 +1,255 @@ +import functools +from datatorch.utils.files import mkdir_exists +from uuid import UUID +from typing import Optional, TYPE_CHECKING, Union, cast +from typing_extensions import TypedDict +from pathlib import Path + +from requests import Session +from requests.adapters import Retry +from requests.sessions import HTTPAdapter + +from datatorch.uploader import get_upload_stats +from datatorch.api.client import Client + +from .directory import ArtifactDirectory + +if TYPE_CHECKING: + from .commit.commit import Commit, CommitStatus + + +class ArtifactExistsError(Exception): + pass + + +class CommitExistsError(Exception): + pass + + +class UploadSession(Session): + def __init__(self): + super().__init__() + retries = Retry( + total=10, + backoff_factor=1, + status_forcelist=(408, 409, 429, 500, 502, 503, 504), + redirect=5, + ) + adapter = HTTPAdapter( + # Keeps retrying for ~20 mins + max_retries=retries + ) + self.mount("http://", adapter) + self.mount("https://", adapter) + + +class CommitEntity(TypedDict): + id: str + message: str + status: str + artifactId: str + parentId: str + + +class ArtifactEntity(TypedDict): + id: str + name: str + latest: Optional[CommitEntity] + + +_api: "Optional[ArtifactsApi]" = None + + +class ArtifactsApi(Client): + @classmethod + def instance(cls): + global _api + if _api is None: + _api = cls() + return _api + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._session = UploadSession() + self.artifact_dir = ArtifactDirectory() + self.file_v2_url = f"{self.api_url}/file/v2" + + def _download_redirect(self, url: str, path: Path): + url = f"{self.file_v2_url}/{url}" + headers = { + self.token_header: self._api_token, + # Azure blob type header + "x-ms-blob-type": "BlockBlob", + } + mkdir_exists(str(path.parent)) + with self._session.get(url, headers=headers, stream=True) as stream, open( + path, "wb" + ) as file: + for chuck in stream.iter_content(chunk_size=8192): + file.write(chuck) + + def _upload_redirect(self, url: str, path: Path, category: str = "file"): + url = f"{self.file_v2_url}/{url}" + res = self._session.put(url, allow_redirects=False) + headers = { + self.token_header: self._api_token, + # Azure blob type header + "x-ms-blob-type": "BlockBlob", + } + if res.status_code == 307 or res.status_code == 302: + with open(path, "rb") as buff: + new_url = res.headers["Location"] + pbuff = get_upload_stats().add(category, buff) + self._session.put( + new_url, + data=pbuff, + headers=headers, + allow_redirects=True, + ) + + def upload_artifact_file( + self, artifact_id: Union[str, UUID], file_path: Path, file_hash: str + ): + url = f"artifact/{str(artifact_id)}/file/{file_hash}" + self._upload_redirect(url, file_path, category="artifact") + + def upload_commit_manifest(self, commit_id: Union[str, UUID], manifest_path: Path): + url = f"commit/{str(commit_id)}/manifest" + self._upload_redirect(url, manifest_path, category="artifact") + + def upload_commit_migration( + self, commit_id: Union[str, UUID], migration_path: Path + ): + url = f"commit/{str(commit_id)}/migration" + self._upload_redirect(url, migration_path, category="artifact") + + def download_commit_manifest(self, commit_id: Union[str, UUID]): + url = f"commit/{str(commit_id)}/manifest" + path = Path(self.artifact_dir.commit_manifest(commit_id)) + if path.exists(): + return path + + self._download_redirect(url, path) + return path + + # def create_artifact(self, login: str, project: str, name: str) -> ArtifactEntity: + # res = self.execute( + # """ + # mutation CreateArtifact($login: String!, $project: String!, $name: String!) { + # artifact: createArtifact() + # } + # """ + # ) + + def artifact_by_name(self, login: str, project: str, name: str) -> ArtifactEntity: + res = self.execute( + """ + query GetArtifact($login: String!, $project: String!, $name: String!) { + artifact: artifactByName(login: $login, project: $project, name: $name) { + id + name + latest { + id + status + message + artifactId + parentId + } + } + } + """, + params=dict(login=login, project=project, name=name), + ) + artifact = res.get("artifact") + if artifact is None: + raise ArtifactExistsError( + "Artifact does not exist. Makes sure you are " + + "using the correct, namespace, project name " + + "and artifact name." + ) + return cast(ArtifactEntity, artifact) + + def commit(self, id: UUID) -> CommitEntity: + res = self.execute( + """ + query GetCommit($id: ID!) { + commit: commit(id: $id) { + id + message + # tags + status + parentId + } + } + """, + params=dict(id=str(id)), + ) + commit = res.get("commit") + if commit is None: + raise CommitExistsError("Commit does not exist.") + return cast(CommitEntity, commit) + + def create_commit( + self, + commit_id: str, + artifact_id: str, + parent_id: str = None, + status: "CommitStatus" = None, + message: str = None, + ): + res = self.execute( + """ + mutation CreateArtifactCommit( + $commitId: ID! + $artifactId: ID! + $status: ArtifactCommitStatus + $message: String + $parentId: ID + ) { + commit: createArtifactCommit( + input: { + id: $commitId + artifactId: $artifactId + status: $status + message: $message + parentId: $parentId + } + ) { + id + } + } + """, + params=dict( + commitId=commit_id, + artifactId=artifact_id, + status=str(status), + message=message, + parentId=parent_id, + ), + ) + return res.get("commit") is not None + + def update_commit( + self, commit_id: str, message: str = None, status: "CommitStatus" = None + ): + res = self.execute( + """ + mutation UpdateArtifactCommit( + $commitId: ID! + $status: ArtifactCommitStatus + $message: String + ) { + commit: updateArtifactCommit( + id: $commitId + input: { + status: $status + message: $message + } + ) { + id + } + } + """, + params=dict(commitId=commit_id, message=message, status=str(status)), + ) + return res.get("commit") is not None diff --git a/datatorch/artifacts/artifact.py b/datatorch/artifacts/artifact.py new file mode 100644 index 0000000..17fdea0 --- /dev/null +++ b/datatorch/artifacts/artifact.py @@ -0,0 +1,102 @@ +import functools +from typing import List, Optional, Union +from uuid import UUID, uuid4 + +from .api import ArtifactExistsError, ArtifactsApi +from .commit.commit import CommitStatus +from .commit import Commit + + +class InvalidArtifactName(Exception): + pass + + +class ArtifactHasNoCommits(Exception): + pass + + +class Artifact(object): + @classmethod + def create(cls, full_name: str): + pass + + def __init__(self, full_name: Union[str, UUID], tag: str = "latest") -> None: + + if isinstance(full_name, UUID): + raise ValueError("TODO") + + self._api = ArtifactsApi.instance() + self._artifact_id = id + self.tag = tag + + if len(full_name.split("/")) != 3: + raise InvalidArtifactName( + "An artifact is made up of three parts. The owner which " + + "is your login or an organizations slug, the project " + + "slug, and the name of the artifact. These need to be " + + "seperated by forward splash's (/)." + ) + + self.namespace, self.project_name, self.name = full_name.split("/") + + self._new_commit: Commit = Commit( + uuid4(), status=CommitStatus.Initalized, artifact=self + ) + self._head: Optional[Commit] = None + + def add(self, local_path: str, artifact_path: str = ""): + self._new_commit.add(local_path, artifact_path=artifact_path) + + def remove(self, artifact_path: str): + return self._new_commit.remove(artifact_path) + + @functools.lru_cache() + def __get_entity(self): + try: + return self._api.artifact_by_name( + self.namespace, self.project_name, self.name + ) + except ArtifactExistsError: + # return self._api.create_artifact(self.namespace, self.project_name, self.name) + raise + + @functools.lru_cache() + def __get_commit(self, commit_id: UUID) -> Optional[Commit]: + commit = Commit.request(commit_id) + commit.artifact = self + return commit + + @property + def head(self) -> Optional[Commit]: + if self.tag == "latest": + entity = self.__get_entity() + commit_entity = entity.get("latest") + if not commit_entity: + return None + commit = Commit.from_dict(commit_entity) + commit.artifact = self + return commit + + # TODO: get head when a tag is provided + + @property + def id(self) -> UUID: + return UUID(self.__get_entity().get("id")) + + def commit(self, message: str = "", tags: List[str] = []): + self._new_commit.commit(message=message, tags=tags) + + self._head = self._new_commit + self._new_commit = Commit( + uuid4(), status=CommitStatus.Initalized, artifact=self + ) + + def download_file(self, artifact_path: str): + if self.head: + return self.head.download_file(artifact_path) + raise ArtifactHasNoCommits("No commits exist for this artifact.") + + def download(self): + if self.head: + return self.head.download() + raise ArtifactHasNoCommits("No commits exist for this artifact.") diff --git a/datatorch/artifacts/commit/__init__.py b/datatorch/artifacts/commit/__init__.py new file mode 100644 index 0000000..ca7091b --- /dev/null +++ b/datatorch/artifacts/commit/__init__.py @@ -0,0 +1,5 @@ +from .commit import Commit +from .manifest import CommitManifest +from .migrations import CommitMigrations + +__all__ = ["Commit", "CommitManifest", "CommitMigrations"] diff --git a/datatorch/artifacts/commit/commit.py b/datatorch/artifacts/commit/commit.py new file mode 100644 index 0000000..544c87a --- /dev/null +++ b/datatorch/artifacts/commit/commit.py @@ -0,0 +1,379 @@ +from datatorch.uploader.pool import get_upload_pool +from datatorch.artifacts.downloader.pool import download_commit +from enum import Enum +from datatorch.uploader.events import CommitMigrationUploadEvent +import os +import functools +import logging +from pathlib import Path +from os import stat_result +from uuid import UUID +from typing import Dict, List, Optional, TYPE_CHECKING + +from ..hash import create_checksum +from ..api import ArtifactsApi, CommitEntity +from ..directory import ArtifactDirectory + +from .migrations import CommitMigrations +from .manifest import CommitManifest, CommitManifestFile + +from datatorch.uploader import CommitManifestUploadEvent, ArtifactFileUploadEvent + + +if TYPE_CHECKING: + from ..artifact import Artifact + + +logger = logging.getLogger(__name__) + + +class CommitLockedExpection(Exception): + """ Rased when trying to modify a commit that has been committed. """ + + pass + + +class CommitNotUploadedExpection(Exception): + """ + Raised when trying to download a commit that hasn't successfully + uploaded. + """ + + pass + + +class CommitMissingArtifact(Exception): + pass + + +class CommitStatus(Enum): + Uploading = "UPLOADING" + Initalized = "INITALIZED" + Failed = "FAILED" + Committed = "COMMITTED" + Deleted = "DELETED" + + @classmethod + def from_str(cls, string: str): + if string in ("UPLOADING"): + return cls.Uploading + if string in ("INITALIZED"): + return cls.Initalized + if string in ("FAILED"): + return cls.Failed + if string in ("COMMITTED"): + return cls.Committed + if string in ("DELETED"): + return cls.Deleted + raise ValueError + + def __eq__(self, o) -> bool: + return str(o) == str(self) + + def __str__(self): + return self.value + + +class Commit: + @classmethod + def request(cls, id: UUID): + commit = ArtifactsApi.instance().commit(id) + return cls.from_dict(commit) + + @classmethod + def from_dict(cls, dic: CommitEntity): + parent_id = dic.get("parentId") + return cls( + commit_id=UUID(dic["id"]), + message=dic["message"], + status=CommitStatus.from_str(dic["status"]), + parent_id=parent_id and UUID(parent_id), # type: ignore + ) + + def __init__( + self, + commit_id: UUID, + message: str = "", + tags: List[str] = [], + artifact: "Optional[Artifact]" = None, + parent_id: "Optional[UUID]" = None, + status: CommitStatus = CommitStatus.Initalized, + ): + self.id = commit_id + self.message = message + self.artifact = artifact + self.tags = tags + + self._parent_id = parent_id + self.__status = status + + self.message = message + self.artifact: "Optional[Artifact]" = artifact + + self._api = ArtifactsApi.instance() + + # Store files that are hashed so we can get there paths when we upload. + self.hashed_files: Dict[str, Path] = {} + + @functools.lru_cache() + def load_manifest(self) -> CommitManifest: + if self.__status == CommitStatus.Initalized: + return CommitManifest( + commit_id=self.id, previous_commit_id=self.parent and self.parent.id + ) + + try: + path = self._api.download_commit_manifest(self.id) + return CommitManifest.load(path) + except (FileNotFoundError, ValueError): + raise + + @functools.lru_cache() + def load_previous(self) -> "Optional[Commit]": + if self._parent_id: + c = Commit.request(self._parent_id) + c.artifact = self.artifact + return c + + @property + def parent(self) -> "Optional[Commit]": + if ( + self.__status == CommitStatus.Initalized + or self.__status == CommitStatus.Uploading + ): + self._ensure_artifact() + return self.artifact.head + + if self._parent_id: + return self.load_previous() + + return None + + @property + def is_committed(self): + return self.__status == CommitStatus.Committed + + @property + def is_uploading(self): + return self.__status == CommitStatus.Uploading + + @property + def manifest(self): + return self.load_manifest() + + @property + def manifest_path(self): + return Path(ArtifactDirectory().commit_manifest(self.id)) + + @property + def migration_path(self): + return Path(ArtifactDirectory().commit_migration(self.id)) + + @property + def name(self): + return str(self.id) + + @property + def short_name(self): + return self.name[:8] + + def download_file(self, artifact_path: str) -> str: + self._ensure_downloadable() + # TODO(justin): downloading of single commit file path. + return "" + + def download(self, wait=True): + self._ensure_downloadable() + download_commit(self, wait=wait) + # TODO(justin): downloading of all commit files. + return "" + + def files(self, dir: str = ""): + dir_obj = self.manifest.get_dir(Path(dir)) + return self.manifest.files(dir_obj) + + def get(self, artifact_path: str): + return self.manifest.get(Path(artifact_path)) + + def __setitem__(self, artifact_path: str, local_path: str): + if not self.add(local_path, artifact_path=artifact_path): + raise FileExistsError(f"File or directory not found ({local_path}).") + + def add(self, local_path: str, artifact_path: str = ""): + self._ensure_modifiable() + path_local = Path(local_path).resolve(strict=False) + + if path_local.is_file(): + self.add_file(local_path, artifact_path=artifact_path) + return True + + if path_local.is_dir(): + self.add_dir(local_path, artifact_path=artifact_path) + return True + + # TODO(justin): Allow regex for absolute paths + return self.add_dir(".", artifact_path=artifact_path, pattern=local_path) + + def add_file(self, local_path: str, artifact_path: str = ""): + self._ensure_modifiable() + + path = Path(local_path).resolve(strict=True) + path_artifact = Path(artifact_path or path.name) + print(f"adding file: {path} => {path_artifact}") + + if not path.is_file(): + raise ValueError(f"Path is not a file. '{local_path}' must be a file.") + + lstat = path.lstat() + manifest_file = self.manifest.get_file(path_artifact) + if ( + manifest_file is not None + and manifest_file["lastModified"] == lstat.st_mtime + and manifest_file["size"] == lstat.st_size + ): + # File is already exists and it hasn't changed. + return False + + # Expensive operation. Best to not run it unless we have to + record = self._hash_file(path, lstat=lstat) + return self.manifest.add(path_artifact, record) + + def add_dir(self, local_path: str, artifact_path: str = "", pattern: str = "*"): + self._ensure_modifiable() + + path = Path(local_path).resolve(strict=True) + if not path.is_dir(): + raise ValueError(f"Local path: '{local_path}' must be a directory.") + + for file in path.rglob(pattern): + if file.is_file(): + ap = os.path.join(artifact_path, file.relative_to(path)) + self.add_file(str(file), artifact_path=ap) + + def __delitem__(self, artifact_path: str): + self._ensure_modifiable() + + path_exists = self.manifest.get(Path(artifact_path)) + if not path_exists: + raise KeyError(f"'{artifact_path}' does not exist in this commit.") + self.remove(artifact_path) + + def remove(self, artifact_path): + self._ensure_modifiable() + self.manifest.remove(Path(artifact_path)) + + def diff(self, commit: "Commit" = None): + return self.manifest.diff(commit and commit.manifest) + + def migrations(self): + created, deleted = self.diff(self.parent) + migrations = { + **dict.fromkeys(created, "CREATED"), + **dict.fromkeys(deleted, "DELETED"), + } + return CommitMigrations( + commit_id=self.id, + migrations=migrations, + from_commit_id=self.parent and self.parent.id, + ) + + def __create(self): + """ Insert create commit entity with API calls""" + self._ensure_artifact() + self._api.create_commit( + str(self.id), + str(self.artifact and self.artifact.id), + parent_id=self.parent and str(self.parent.id), + status=self.__status, + message=self.message, + ) + + def __update(self): + self._api.update_commit(str(self.id), status=self.__status) + + def __hash__(self) -> int: + return self.id.__hash__() + + def commit(self, message: str = "", tags: List[str] = []): + """ + Creates commit entity, and sends files to thread pool to be + uploaded. + """ + self._ensure_artifact() + self._ensure_modifiable() + + self.message = message or self.message + self.tags = tags + + artifact: "Artifact" = self.artifact # type: ignore + migrations = self.migrations() + + has_migrations = bool(migrations.migrations) + if not has_migrations: + # Nothing to change. + return + + # TODO(justin): update commit when finished uploading + self.__status = CommitStatus.Uploading + # raise Exception("ERROR") + self.__create() + + get_upload_pool().processed_commit.add(self) + + # Write and upload manifest. + self.manifest.write(self.manifest_path) + CommitManifestUploadEvent.emit(self.manifest_path, self.id) + + # Write and upload migrations. + migrations.write(self.migration_path) + CommitMigrationUploadEvent.emit(self.migration_path, self.id) + + # Create an upload event for each new file create + for hash, action in migrations.migrations.items(): + if action == "CREATED": + path = self.hashed_files[hash] + ArtifactFileUploadEvent.emit( + path, artifact_id=artifact.id, file_hash=hash + ) + if action == "DELETED": + # Just because a file is deleted from a commit does not mean we + # can delete it on the server. It may be used in other commits. + continue + + def _hash_file( + self, file_path: Path, lstat: stat_result = None + ) -> CommitManifestFile: + lstat = lstat or file_path.lstat() + file_hash = create_checksum(str(file_path)) + file_record: CommitManifestFile = { + "hash": file_hash, + "size": lstat.st_size, + "lastModified": lstat.st_mtime, + } + self.hashed_files[file_hash.hex()] = file_path + return file_record + + def _ensure_artifact(self): + if not self.artifact: + raise CommitMissingArtifact( + "This commit is missing the an artifact. You " + + "can assign an artifact using the `artifact` " + + "property." + ) + + def _ensure_modifiable(self): + if self.__status != CommitStatus.Initalized: + raise CommitLockedExpection( + "This commit has been committed. You can not " + + "modify it anymore. You can branch from this " + + "commmit or create a new one to the head of " + + "the artifact." + ) + + def _ensure_downloadable(self): + if self.__status != CommitStatus.Committed: + pass + + def _mark_committed(self): + self.__status = CommitStatus.Committed + self.__update() diff --git a/datatorch/artifacts/commit/manifest.py b/datatorch/artifacts/commit/manifest.py new file mode 100644 index 0000000..66808b8 --- /dev/null +++ b/datatorch/artifacts/commit/manifest.py @@ -0,0 +1,237 @@ +from datatorch.artifacts.directory import ArtifactDirectory +from io import BufferedWriter +from typing import Any, Dict, Generator, Set, Tuple, Union, cast +from uuid import UUID +from typing_extensions import TypedDict + +from pathlib import Path +import fastavro as fa +import os + + +_schema_file = { + "type": "record", + "name": "File", + "fields": [ + {"name": "size", "type": "long"}, + {"name": "hash", "type": "bytes"}, + {"name": "lastModified", "type": "float"}, + ], +} + + +class CommitManifestFile(TypedDict): + size: int + lastModified: float + hash: bytes + + +def _is_manifest_file(entity: Any): + return entity.get("hash") is not None + + +_schema_directory = { + "type": "record", + "name": "Directory", + "fields": [ + { + "name": "dirs", + "type": {"type": "map", "values": "Directory"}, + "default": {}, + }, + { + "name": "files", + "type": {"type": "map", "values": _schema_file}, + "default": {}, + }, + ], +} + + +class CommitManifestDirectory(TypedDict): + files: Dict[str, CommitManifestFile] + dirs: Dict[str, "CommitManifestDirectory"] + + +def _is_manifest_dir(entity: Any): + has_files = entity.get("files") is not None + has_dirs = entity.get("dirs") is not None + return has_files and has_dirs + + +def _tarverse_record( + record: CommitManifestDirectory, directory: str = "" +) -> Generator[Tuple[str, CommitManifestFile], None, None]: + + if record is None: + return + + dirs = record["dirs"] + for d, nr in dirs.items(): + nd = os.path.join(directory, d) + for f in _tarverse_record(nr, directory=nd): + yield f + + files = record["files"] + for name, file in files.items(): + yield os.path.join(directory, name), file + + +_schema = { + "doc": "Manifest containing all files for a single commit", + "name": "CommitManifest", + "namespace": "datatorch.artifact.commit.manifest", + "type": "record", + "fields": [ + {"name": "commitId", "type": "bytes"}, + {"name": "previousCommitId", "type": ["bytes", "null"]}, + {"name": "branch", "type": "string", "default": "main"}, + {"name": "root", "type": _schema_directory}, + ], +} + + +class CommitManifest: + @staticmethod + def schema(): + return _schema + + @classmethod + def load(cls, file_path: Path) -> "CommitManifest": + path = file_path.resolve(strict=True) + + if not path.is_file() or not fa.is_avro(str(path)): + raise ValueError("File must be a AVRO file.") + + with open(str(path), "rb") as manifest_file: + record = next(fa.reader(manifest_file), None) + if record is None: + raise ValueError("Manifest does not contain any records.") + + commit_id: bytes = record["commitId"] + root: CommitManifestDirectory = record["root"] + commit_previous_id: Union[bytes, None] = record["previousCommitId"] + commit_previous_uuid = ( + UUID(bytes=commit_previous_id) if commit_previous_id else None + ) + + return cls( + UUID(bytes=commit_id), + root=root, + previous_commit_id=commit_previous_uuid, + ) + + def __init__( + self, + commit_id: UUID, + root: CommitManifestDirectory = None, + previous_commit_id: UUID = None, + ): + self.commit_id = commit_id + self.previous_commit_id: Union[None, UUID] = previous_commit_id + self.root = root or cast(CommitManifestDirectory, {"files": {}, "dirs": {}}) + + def get_file(self, artifact_path: Path) -> Union[CommitManifestFile, None]: + parent = artifact_path.parent + parent_dir = self.get_dir( + parent, + ) + if parent_dir is None: + return None + + return parent_dir.get("files").get(artifact_path.name) + + def get_dir(self, artifact_path: Path) -> Union[CommitManifestDirectory, None]: + if str(artifact_path) == ".": + return self.root + + found_dir = self.root + for dir in artifact_path.parts: + found_dir = found_dir["dirs"].get(dir) + if found_dir is None: + return None + + return found_dir + + def get(self, artifact_path: Path): + return self.get_file(artifact_path) or self.get_dir(artifact_path) + + def add( + self, + artifact_path: Path, + commit_obj: Union[CommitManifestDirectory, CommitManifestFile], + ): + path_name = artifact_path.name + path_parent = artifact_path.parent + obj_parent = self._make_dirs(path_parent) + + if obj_parent is None: + raise ValueError( + "Failed to insert files into manifest. Parent object not found." + ) + + if _is_manifest_dir(commit_obj): + obj_parent["dirs"][path_name] = commit_obj # type: ignore + return True + + if _is_manifest_file(commit_obj): + obj_parent["files"][path_name] = commit_obj # type: ignore + return True + + raise ValueError("Invalid commit type.") + + def remove(self, artifact_path: Path): + parent_path = artifact_path.parent + parent = self.get_dir(parent_path) + + if parent: + parent["files"].pop(artifact_path.name) + parent["dirs"].pop(artifact_path.name) + + def _make_dirs(self, path: Path): + if path.parent == ".": + return self.root + + has_self = self.get_dir(path) + if has_self: + return has_self + + has_parent = self.get_dir(path.parent) + if not has_parent: + # make parent directory first + self._make_dirs(path.parent) + + # create directory + created_dir: CommitManifestDirectory = {"files": {}, "dirs": {}} + self.add(path, created_dir) + + return created_dir + + def files(self, directory: CommitManifestDirectory = None): + record = directory or self.root + return _tarverse_record(record) + + def diff(self, manifest: "CommitManifest" = None): + current_files: Set[str] = set(map(lambda p: p[1]["hash"].hex(), self.files())) + other_files: Set[str] = set( + map(lambda p: p[1]["hash"].hex(), (manifest and manifest.files()) or []) + ) + + created = current_files.difference(other_files) + deleted = other_files.difference(current_files) + + return created, deleted + + def writer(self, buffer: BufferedWriter): + ps = fa.parse_schema(self.schema()) + record = dict( + commitId=self.commit_id.bytes, + root=self.root, + previousCommitId=self.previous_commit_id and self.previous_commit_id.bytes, + ) + fa.writer(buffer, ps, [record]) + + def write(self, path: Path): + path.parent.mkdir(parents=True, exist_ok=True) + with open(path, "wb") as out: + self.writer(out) diff --git a/datatorch/artifacts/commit/migrations.py b/datatorch/artifacts/commit/migrations.py new file mode 100644 index 0000000..4408c1f --- /dev/null +++ b/datatorch/artifacts/commit/migrations.py @@ -0,0 +1,103 @@ +from uuid import UUID +from io import BufferedWriter +from pathlib import Path +from typing import Dict, Set, Tuple, Union + +from datetime import datetime +import fastavro as fa + + +_schema_action = { + "type": "enum", + "name": "CommitMigrationAction", + "symbols": ["CREATED", "DELETED"], +} + +_schema_map = {"type": "map", "values": _schema_action} + +_schema = { + "doc": "Commit migrations contains all changes made for a single commit", + "name": "CommitMigrations", + "namespace": "datatorch.artifact.commit.migrations", + "type": "record", + "fields": [ + {"name": "commitId", "type": "bytes"}, + {"name": "fromCommitId", "type": ["bytes", "null"]}, + {"name": "createdAt", "type": "int"}, + { + "name": "migrations", + "type": _schema_map, + "default": {}, + }, + ], +} + + +class CommitMigrations: + @classmethod + def load(cls, migrations_file: Path) -> "CommitMigrations": + path = migrations_file.resolve(strict=True) + + if not path.is_file() or not fa.is_avro(str(path)): + raise ValueError("File must be a AVRO file.") + + with open(path, "rb") as fp: + record = next(fa.reader(fp), None) + if record is None: + raise ValueError("Migrations contains no record.") + + # Get values + commit_id: bytes = record["commitId"] + from_commit_id: Union[bytes, None] = record["fromCommitId"] + migrations: Dict[str, str] = record["migrations"] + + # Convert to correct format + from_commit_uuid = UUID(bytes=from_commit_id) if from_commit_id else None + + return cls( + UUID(bytes=commit_id), migrations, from_commit_id=from_commit_uuid + ) + + @staticmethod + def schema(): + return _schema + + def __init__( + self, + commit_id: UUID, + migrations: Dict[str, str], + from_commit_id: UUID = None, + ): + self.commit_id = commit_id + self.from_commit_id = from_commit_id + self.migrations = migrations + + def writer(self, buffer: BufferedWriter): + ps = fa.parse_schema(self.schema()) + + fa.writer( + buffer, + ps, + [ + { + "commitId": self.commit_id.bytes, + "fromCommitId": self.from_commit_id and self.from_commit_id.bytes, + "createdAt": datetime.utcnow().timestamp(), + "migrations": self.migrations, + } + ], + ) + + def write(self, buff: Union[str, Path]): + with open(buff, "wb") as out: + self.writer(out) + + def to_sets(self) -> Tuple[Set[str], Set[str]]: + created = set([]) + deleted = set([]) + for k, v in self.migrations.items(): + if v == "CREATED": + created.add(k) + if v == "DELETED": + deleted.add(k) + return created, deleted diff --git a/datatorch/artifacts/directory.py b/datatorch/artifacts/directory.py new file mode 100644 index 0000000..a139eb7 --- /dev/null +++ b/datatorch/artifacts/directory.py @@ -0,0 +1,35 @@ +import os + +from uuid import UUID +from typing import Union + +from datatorch.core import folder +from datatorch.utils.files import mkdir_exists + + +class ArtifactDirectory: + @staticmethod + def path() -> str: + path = folder.get_app_dir() + return os.getenv("DATATORCH_ARTIFACT_PATH", os.path.join(path, "artifacts")) + + def __init__(self): + mkdir_exists(self.dir) + mkdir_exists(self.commits) + + @property + def dir(self): + return self.path() + + @property + def commits(self): + return os.path.join(self.dir, "commits") + + def commit(self, commit_id: Union[str, UUID]): + return os.path.join(self.commits, str(commit_id)) + + def commit_manifest(self, commit_id: Union[str, UUID]): + return os.path.join(self.commit(commit_id), "manifest.avro") + + def commit_migration(self, commit_id: Union[str, UUID]): + return os.path.join(self.commit(commit_id), "migration.avro") diff --git a/datatorch/artifacts/downloader/__init__.py b/datatorch/artifacts/downloader/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/datatorch/artifacts/downloader/pool.py b/datatorch/artifacts/downloader/pool.py new file mode 100644 index 0000000..3e1c197 --- /dev/null +++ b/datatorch/artifacts/downloader/pool.py @@ -0,0 +1,31 @@ +from uuid import UUID +from typing import TYPE_CHECKING + +from datatorch.utils.thread_pool import ThreadJob, ThreadPool + + +if TYPE_CHECKING: + from ..commit import Commit + + +class _DownloadCommitFile(ThreadJob): + def __init__(self, commit_id: UUID, file_hash: str, file_name: str): + super().__init__() + self.commit_id = commit_id + self.file_hash = file_hash + self.file_name = file_name + + def run(self): + print(f"downloading {self.file_name}") + + +def download_commit(commit: "Commit", wait: bool = True): + download_pool = ThreadPool() + for file_name, meta in commit.files(): + job = _DownloadCommitFile(commit.id, meta.get("hash").hex(), file_name) + download_pool.enqueue(job) + + if wait: + download_pool.shutdown() + + return download_pool diff --git a/datatorch/artifacts/hash.py b/datatorch/artifacts/hash.py new file mode 100644 index 0000000..35d35f2 --- /dev/null +++ b/datatorch/artifacts/hash.py @@ -0,0 +1,10 @@ +import hashlib + + +def create_checksum(file_path: str): + hash = hashlib.md5() + block_size = 128 * hash.block_size + with open(file_path, "rb") as f: + for chunk in iter(lambda: f.read(block_size), b""): + hash.update(chunk) + return hash.digest() diff --git a/datatorch/core/__init__.py b/datatorch/core/__init__.py index c3f70dc..3f03892 100644 --- a/datatorch/core/__init__.py +++ b/datatorch/core/__init__.py @@ -1,7 +1,4 @@ -from datatorch.core.settings import Settings, UserSettings - - -user_settings = UserSettings() +from datatorch.core.settings import Settings, user_settings BASE_URL = "https://datatorch.io" diff --git a/datatorch/core/settings.py b/datatorch/core/settings.py index dedb914..195fd2e 100644 --- a/datatorch/core/settings.py +++ b/datatorch/core/settings.py @@ -70,3 +70,6 @@ def _load_json(path: str) -> dict: def _save_json(path: str, settings: dict): with open(path, "w") as f: json.dump(settings, f, indent=2, sort_keys=True) + + +user_settings = UserSettings() diff --git a/datatorch/model/enviroment.py b/datatorch/model/enviroment.py new file mode 100644 index 0000000..221b117 --- /dev/null +++ b/datatorch/model/enviroment.py @@ -0,0 +1,26 @@ +from enum import Enum +from pathlib import Path + + +class PythonEnvironmentType(Enum): + Pip = "pip" + PipTools = "pip-tools" + Pipenv = "pipenv" + Conda = "conda" + + @classmethod + def getenvconfig(): + pass + + +class PythonEnvironment: + @classmethod + def load(cls, path: str): + config_path = Path(path) + is_file = config_path.is_file() + + def __init__(self): + self.type: PythonEnvironmentType = PythonEnvironmentType.Conda + + def install(self): + pass diff --git a/datatorch/model/enviroment/conda.py b/datatorch/model/enviroment/conda.py new file mode 100644 index 0000000..9c4c608 --- /dev/null +++ b/datatorch/model/enviroment/conda.py @@ -0,0 +1,8 @@ +import os + +CONDA_EXE = "CONDA_EXE" + + +def get_conda_executable(default_exec: str = "conda"): + if "CONDA_EXE" in os.environ: + conda_bin_dir = os.path.dirname(os.getenv(CONDA_EXE)) diff --git a/datatorch/uploader/__init__.py b/datatorch/uploader/__init__.py new file mode 100644 index 0000000..9f3ed0c --- /dev/null +++ b/datatorch/uploader/__init__.py @@ -0,0 +1,10 @@ +from .stats import get_upload_stats +from .pool import get_upload_pool +from .events import ArtifactFileUploadEvent, CommitManifestUploadEvent + +__all__ = [ + "get_upload_stats", + "get_upload_pool", + "ArtifactFileUploadEvent", + "CommitManifestUploadEvent", +] diff --git a/datatorch/uploader/events.py b/datatorch/uploader/events.py new file mode 100644 index 0000000..baad689 --- /dev/null +++ b/datatorch/uploader/events.py @@ -0,0 +1,91 @@ +import logging +from typing import Optional, TYPE_CHECKING + +from datatorch.utils.thread_pool import ThreadJob +from datatorch.uploader.pool import get_upload_pool + +from uuid import UUID +from pathlib import Path + + +if TYPE_CHECKING: + from datatorch.artifacts.api import ArtifactsApi + + +logger = logging.getLogger(__name__) + + +__artifact_api: "Optional[ArtifactsApi]" = None + + +def _artifact_api(): + global __artifact_api + + if __artifact_api is None: + from datatorch.artifacts.api import ArtifactsApi + + __artifact_api = ArtifactsApi.instance() + return __artifact_api + + +class FileApiUploadEvent(ThreadJob): + def __init__(self): + self._api = _artifact_api() + super().__init__() + + +class FileApiPathUploadEvent(FileApiUploadEvent): + def __init__(self, path: Path): + super().__init__() + + if not path.is_file(): + raise ValueError("Path must point to a file.") + self.path = path + + +class ArtifactFileUploadEvent(FileApiPathUploadEvent): + @classmethod + def emit(cls, path: Path, artifact_id: UUID, file_hash: str): + event = cls(path, artifact_id, file_hash) + get_upload_pool().enqueue(event) + + def __init__(self, path: Path, artifact_id: UUID, file_hash: str): + super().__init__(path) + self.artifact_id = artifact_id + self.file_hash = file_hash + + def run(self): + self._api.upload_artifact_file(self.artifact_id, self.path, self.file_hash) + + +class CommitMigrationUploadEvent(FileApiPathUploadEvent): + @classmethod + def emit(cls, path: Path, commit_id: UUID): + event = cls(path, commit_id) + get_upload_pool().enqueue(event) + + def __init__(self, path: Path, commit_id: UUID): + super().__init__(path) + self.commit_id = commit_id + + def run(self): + self._api.upload_commit_migration(self.commit_id, self.path) + + +class CommitManifestUploadEvent(FileApiPathUploadEvent): + @classmethod + def emit(cls, path: Path, commit_id: UUID): + event = cls(path, commit_id) + get_upload_pool().enqueue(event) + + def __init__(self, path: Path, commit_id: UUID): + super().__init__(path) + self.commit_id = commit_id + + def run(self): + self._api.upload_commit_manifest(self.commit_id, self.path) + + +# TODO: Use TUS for resumable uploads. +class TusUploadEvent(ThreadJob): + pass diff --git a/datatorch/uploader/pool.py b/datatorch/uploader/pool.py new file mode 100644 index 0000000..3418e08 --- /dev/null +++ b/datatorch/uploader/pool.py @@ -0,0 +1,48 @@ +from typing import Set, TYPE_CHECKING +from datatorch.agent import logging +from datatorch.utils.thread_pool import ThreadPool + +if TYPE_CHECKING: + from datatorch.artifacts.commit.commit import Commit + + +logger = logging.getLogger(__name__) + + +class UploadThreadPool(ThreadPool): + def __init__(self) -> None: + super().__init__() + self.processed_commit: Set[Commit] = set([]) + + def join(self): + """ Wait for completion of all tasks in queue. """ + return self.queue.join() + + def shutdown(self): + from .stats import get_upload_stats + + if not self.alive(): + return + + logger.debug("shuting down upload thread pool.") + + get_upload_stats().show_progress() + get_upload_stats()._reset_bytes() + + self._mark_commits_as_committed() + + return super(UploadThreadPool, self).shutdown() + + def _mark_commits_as_committed(self): + for commit in self.processed_commit: + commit._mark_committed() + + +_upload_pool = UploadThreadPool() +_upload_pool.abort() + + +def get_upload_pool(): + global _upload_pool + _upload_pool.run() + return _upload_pool diff --git a/datatorch/uploader/stats.py b/datatorch/uploader/stats.py new file mode 100644 index 0000000..7180fd0 --- /dev/null +++ b/datatorch/uploader/stats.py @@ -0,0 +1,185 @@ +from datatorch.utils.format import std_out_err_redirect_tqdm +import logging +import traceback +from tqdm import tqdm + +from typing import Callable, Dict +from datatorch.utils.wrapper import Wrapper +from io import BufferedReader +from threading import Lock +import sys +import os + + +logger = logging.getLogger(__name__) + +ProgressCallback = Callable[[int, int], None] + + +class FileChangingError(Exception): + pass + + +class ProgressBufferedReader(Wrapper): + __wraps__ = BufferedReader + + def __init__(self, file: BufferedReader, callback: ProgressCallback = None): + super().__init__(file) + self._file = file + + def _callback(_: int, __: int): + return + + self.callback = callback or _callback + self.read_bytes = 0 + self.total_bytes = os.fstat(file.fileno()).st_size + + def read(self, size: int = -1): + + bites = self._file.read(size) + bytes_read = len(bites) + self.read_bytes += bytes_read + + is_shrinking = not bites and self.read_bytes < self.total_bytes + # is_growing = self.read_bytes > self.total_bytes + if is_shrinking: + raise FileChangingError( + f"File size is shrinking (read={self.read_bytes}, size={self.total_bytes})." + ) + + self.callback(bytes_read, self.read_bytes) + return bites + + def reset(self): + self.seek(0) + self.callback(-self.read_bytes, 0) + self.read_bytes = 0 + + +class UploadStatsLock: + def __init__(self): + self.__total_lock = Lock() + self.__total_bytes = 0 + + self.__uploaded_lock = Lock() + self.__uploaded_bytes = 0 + + def _add_total_bytes(self, value: int): + with self.__total_lock: + self.__total_bytes += value + + def _add_uploaded_bytes(self, value: int): + with self.__uploaded_lock: + self.__uploaded_bytes += value + + def _reset_bytes(self): + with self.__total_lock: + self.__total_bytes = 0 + with self.__uploaded_lock: + self.__uploaded_bytes = 0 + + @property + def total_bytes(self): + return self.__total_bytes + + @property + def uploaded_bytes(self): + return self.__uploaded_bytes + + +class UploadStats(UploadStatsLock): + def __init__(self, category: "CategoryUploadStats" = None): + super().__init__() + self.files: Dict[str, BufferedReader] = {} + self.category = category + + def add(self, br: BufferedReader) -> BufferedReader: + file_found = self.files.get(br.name) + if file_found and not file_found.closed: + raise ValueError(f"Buffer '{br.name}' is already being monitored.") + + pbr = ProgressBufferedReader(br, self.__update_stats) + self._add_total_bytes(pbr.total_bytes) + logger.info(f"monitoring buffer upload progress ({br.name})") + self.files[br.name] = pbr # type: ignore + return pbr # type: ignore + + def __update_stats(self, read_bytes, _): + self._add_uploaded_bytes(read_bytes) + + def _add_total_bytes(self, value: int): + super()._add_total_bytes(value) + if self.category: + self.category._add_total_bytes(value) + + def _add_uploaded_bytes(self, value: int): + super()._add_uploaded_bytes(value) + if self.category: + self.category._add_uploaded_bytes(value) + + @property + def total_bytes(self): + return self.__total_bytes + + @property + def uploaded_bytes(self): + return self.__uploaded_bytes + + +class CategoryUploadStats(UploadStatsLock): + def __init__(self): + super().__init__() + self._category_lock = Lock() + self._categories: Dict[str, UploadStats] = {} + + def add(self, category: str, br: BufferedReader): + with self._category_lock: + if self._categories.get(category) is None: + self._categories[category] = UploadStats(self) + buff = self._categories[category].add(br) + return buff + + def remove(self, category: str, br: BufferedReader): + with self._category_lock: + stats = self._categories.get(category) + if not stats: + return + + buff = stats.files.get(br.name) + if buff: + del stats.files[br.name] + + def show_progress(self): + from .pool import get_upload_pool + + is_done = get_upload_pool().done() + + if is_done: + return + + with tqdm( + desc="Uploading files", + total=self.total_bytes, + unit="B", + unit_scale=True, + unit_divisor=1024, + dynamic_ncols=True, + file=sys.stdout, + ) as pbar: + prev = 0 + while not get_upload_pool().done(): + delta = self.uploaded_bytes - prev + prev = self.uploaded_bytes + pbar.update(delta) + if pbar.total != self.total_bytes: + pbar.total = self.total_bytes + pbar.refresh() + pbar.update(pbar.total - pbar.last_print_n) + + +_upload_stats = CategoryUploadStats() + + +def get_upload_stats(): + global _upload_stats + return _upload_stats diff --git a/datatorch/utils/exithook.py b/datatorch/utils/exithook.py new file mode 100644 index 0000000..d027e6d --- /dev/null +++ b/datatorch/utils/exithook.py @@ -0,0 +1,18 @@ +from typing import Callable + +from datatorch.utils import ipython +import atexit + + +def register(callback: Callable, ipython_event: str = "post_execute"): + ipython.register(ipython_event, callback) or atexit.register(callback) + + +def unregister( + callback: Callable, ipython_event: str = "post_execute", silent_fail=True +): + try: + ipython.unregister(ipython_event, callback) or atexit.unregister(callback) + except Exception as e: + if not silent_fail: + raise e diff --git a/datatorch/utils/format.py b/datatorch/utils/format.py new file mode 100644 index 0000000..9422a6c --- /dev/null +++ b/datatorch/utils/format.py @@ -0,0 +1,27 @@ +import contextlib +import sys + +from tqdm.contrib import DummyTqdmFile + + +def sizeof_fmt(num: float, suffix="B"): + for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]: + if abs(num) < 1024.0: + return "%3.1f%s%s" % (num, unit, suffix) + num /= 1024.0 + return "%.1f %s%s" % (num, "Yi", suffix) + + +@contextlib.contextmanager +def std_out_err_redirect_tqdm(): + orig_out_err = sys.stdout, sys.stderr + try: + # sys.stdout = sys.stderr = DummyTqdmFile(orig_out_err[0]) + sys.stdout, sys.stderr = map(DummyTqdmFile, orig_out_err) + yield orig_out_err[0] + # Relay exceptions + except Exception as exc: + raise exc + # Always restore sys.stdout/err if necessary + finally: + sys.stdout, sys.stderr = orig_out_err diff --git a/datatorch/utils/ipython.py b/datatorch/utils/ipython.py new file mode 100644 index 0000000..34dcb2a --- /dev/null +++ b/datatorch/utils/ipython.py @@ -0,0 +1,26 @@ +from typing import Any, Callable, Union + + +def get_ipython() -> Union[Any, None]: + try: + import IPython as ip + + return ip.get_ipython() + except ImportError: + return False + + +def register(event: str, callback: Callable): + ip = get_ipython() + if ip: + ip.events.register(event, callback) + return True + return False + + +def unregister(event: str, callback: Callable): + ip = get_ipython() + if ip: + ip.events.unregister(event, callback) + return True + return False diff --git a/datatorch/utils/thread_pool.py b/datatorch/utils/thread_pool.py new file mode 100644 index 0000000..aea9366 --- /dev/null +++ b/datatorch/utils/thread_pool.py @@ -0,0 +1,136 @@ +from datatorch.utils import exithook +from threading import Event, Thread +from datatorch.agent import logging +from os import cpu_count +import uuid +from queue import Empty, Queue +from typing import List +import time + + +logger = logging.getLogger(__name__) + + +class ThreadJob: + def __init__(self): + self.id = uuid.uuid4() + self._run_count = 0 + + def _run(self): + self._run_count += 1 + self.run() + + def run(self): + raise NotImplementedError("ThreadJob run not implemented.") + + def on_error(self, ex: Exception): + pass + + def on_success(self): + pass + + def on_done(self): + pass + + +class StatusThread(Thread): + def __init__(self): + super().__init__() + self.abort = Event() + self.idle = Event() + + +class ThreadWorker(StatusThread): + def __init__( + self, + name: str, + queue: "Queue[ThreadJob]", + ): + super().__init__() + self.name = name + self.queue = queue + self.daemon = True + self.start() + + def run(self): + while not self.abort.set(): + try: + job = self.queue.get(timeout=0.2) + self.idle.clear() + logger.debug( + f"[{self.name}] Processing event upload event " + + f"(class={job.__class__.__name__}, id={job.id})" + ) + except Empty: + self.idle.set() + continue + + try: + job.run() + except Exception as ex: + job.on_error(ex) + finally: + job.on_done() + self.queue.task_done() + + +class ThreadPool: + def __init__( + self, + thread_count: int = None, + sample_time: float = 0.5, + shutdown_on_exit: bool = True, + ) -> None: + self._sample_time = sample_time + self.shutdown_on_exit = shutdown_on_exit + self._thread_count = thread_count or cpu_count() or 16 + self.queue: "Queue[ThreadJob]" = Queue() + self._threads: List[StatusThread] = [] + + def run(self): + if self.alive(): + return False + + logger.debug(f"creating {self._thread_count} upload threads") + self._threads = [ + ThreadWorker(str(i), self.queue) for i in range(self._thread_count) + ] + + # Make sure we shutdown if we exit + if self.shutdown_on_exit: + exithook.register(self.shutdown) + + return True + + def join(self): + """ Wait for completion of all tasks in queue. """ + return self.queue.join() + + def enqueue(self, file: "ThreadJob", block: bool = False, timeout: float = None): + self.queue.put(file, block=block, timeout=timeout) + + def alive(self): + return True in [t.is_alive() for t in self._threads] + + def idle(self): + return False not in [i.idle.is_set() for i in self._threads] + + def done(self): + return self.queue.qsize() == 0 and self.idle() + + def abort(self, block: bool = False): + for a in self._threads: + a.abort.set() + + while block and self.alive(): + return self._sleep() + + def _sleep(self): + time.sleep(self._sample_time) + + def shutdown(self): + if not self.alive(): + return + + self.abort() + self.join() diff --git a/datatorch/utils/wrapper.py b/datatorch/utils/wrapper.py new file mode 100644 index 0000000..9d56101 --- /dev/null +++ b/datatorch/utils/wrapper.py @@ -0,0 +1,42 @@ +from typing import Type, TypeVar + +T = TypeVar("T") + + +class Wrapper: + """Wrapper class that provides proxy access to an instance of some + internal instance.""" + + __wraps__: Type = None + __ignore__ = "class mro new init setattr getattr getattribute" + __override__ = "" + + def __init__(self, obj): + if self.__wraps__ is None: + raise TypeError("base class Wrapper may not be instantiated") + elif isinstance(obj, self.__wraps__): + self._obj = obj + else: + raise ValueError("wrapped object must be of %s" % self.__wraps__) + + # provide proxy access to regular attributes of wrapped object + def __getattr__(self, name): + return getattr(self._obj, name) + + # create proxies for wrapped object's double-underscore attributes + class __metaclass__(type): + def __init__(cls, name, bases, dct): + def make_proxy(name): + def proxy(self, *args): + return getattr(self._obj, name) + + return proxy + + type.__init__(cls, name, bases, dct) + if cls.__wraps__: # type: ignore + ignore = set("__%s__" % n for n in cls.__ignore__.split()) # type: ignore + for name in dir(cls.__wraps__): # type: ignore + + if name.startswith("__"): + if name not in ignore and name not in dct: + setattr(cls, name, property(make_proxy(name))) diff --git a/examples/Dockerfile b/examples/Dockerfile index 1507fa8..2e892d5 100644 --- a/examples/Dockerfile +++ b/examples/Dockerfile @@ -20,7 +20,7 @@ RUN apt-get update && \ ADD requirements.txt /app/ -RUN conda install -y python=3.6 && \ +RUN conda install -y python=3.8 && \ pip install --upgrade pip && \ pip install -r /app/requirements.txt diff --git a/examples/Untitled.ipynb b/examples/Untitled.ipynb deleted file mode 100644 index ecb5ace..0000000 --- a/examples/Untitled.ipynb +++ /dev/null @@ -1,240 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": 13, - "metadata": {}, - "outputs": [], - "source": [ - "%load_ext autoreload\n", - "%autoreload 2" - ] - }, - { - "cell_type": "code", - "execution_count": 14, - "metadata": {}, - "outputs": [], - "source": [ - "from datatorch.agent.flows import Flow\n", - "import os\n", - "import logging\n", - "import asyncio\n", - "\n", - "logging.basicConfig(\n", - " format=\"%(levelname)-8s %(message)s\",\n", - " level=logging.DEBUG,\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": 15, - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "DEBUG Action found locally (datatorch/example-command@1).\n", - "INFO Running action datatorch/example-command@1\n", - "DEBUG Finished running 'Command Example' v1\n", - "DEBUG Action found locally (datatorch/example-commands@1).\n", - "INFO Running action datatorch/example-commands@1\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "b'You are using python v3.8.2 with CPython on .\\n'\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "DEBUG Finished running 'Multiple Command Example' v1\n", - "DEBUG Action found locally (datatorch/example-shell@1).\n", - "INFO Running action datatorch/example-shell@1\n", - "DEBUG Finished running 'Shell Script Example' v1\n", - "INFO Successfully completed job.\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "b'line 1\\n'\n", - "b'line 2\\n'\n", - "b'line 3\\n'\n", - "b'line 4\\n'\n", - "b'line 5\\n'\n", - "b'line 6\\n'\n" - ] - } - ], - "source": [ - "flow = Flow.from_yaml('flow.yaml')\n", - "task = asyncio.create_task(flow.run(0))" - ] - }, - { - "cell_type": "code", - "execution_count": 4, - "metadata": {}, - "outputs": [], - "source": [ - "import asyncio\n", - "import concurrent.futures\n", - "\n", - "def blocking_io():\n", - " # File operations (such as logging) can block the\n", - " # event loop: run them in a thread pool.\n", - " with open('/dev/urandom', 'rb') as f:\n", - " return f.read(100)\n", - "\n", - "def cpu_bound():\n", - " # CPU-bound operations will block the event loop:\n", - " # in general it is preferable to run them in a\n", - " # process pool.\n", - " return sum(i * i for i in range(10 ** 7))\n", - "\n", - "async def main():\n", - " loop = asyncio.get_running_loop()\n", - "\n", - " ## Options:\n", - "\n", - " # 1. Run in the default loop's executor:\n", - " result = await loop.run_in_executor(\n", - " None, blocking_io)\n", - " print('default thread pool', result)\n", - "\n", - " # 2. Run in a custom thread pool:\n", - " with concurrent.futures.ThreadPoolExecutor() as pool:\n", - " result = await loop.run_in_executor(\n", - " pool, blocking_io)\n", - " print('custom thread pool', result)\n", - "\n", - " # 3. Run in a custom process pool:\n", - " with concurrent.futures.ProcessPoolExecutor() as pool:\n", - " result = await loop.run_in_executor(\n", - " pool, cpu_bound)\n", - " print('custom process pool', result)" - ] - }, - { - "cell_type": "code", - "execution_count": 11, - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "1594154237952" - ] - }, - "execution_count": 11, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "import time\n", - "round(time.time()*1000)" - ] - }, - { - "cell_type": "code", - "execution_count": 7, - "metadata": {}, - "outputs": [], - "source": [ - "import aiodocker" - ] - }, - { - "cell_type": "code", - "execution_count": 30, - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "ERROR Task exception was never retrieved\n", - "future: :1> exception=DockerError(404, \"pull access denied for hello-world2, repository does not exist or may require 'docker login': denied: requested access to the resource is denied\")>\n", - "Traceback (most recent call last):\n", - " File \"/home/justin/.local/lib/python3.8/site-packages/aiodocker/containers.py\", line 67, in run\n", - " container = await self.create(config, name=name)\n", - " File \"/home/justin/.local/lib/python3.8/site-packages/aiodocker/containers.py\", line 54, in create\n", - " data = await self.docker._query_json(\n", - " File \"/home/justin/.local/lib/python3.8/site-packages/aiodocker/docker.py\", line 293, in _query_json\n", - " async with self._query(\n", - " File \"/home/justin/.local/lib/python3.8/site-packages/aiodocker/utils.py\", line 309, in __aenter__\n", - " resp = await self._coro\n", - " File \"/home/justin/.local/lib/python3.8/site-packages/aiodocker/docker.py\", line 268, in _do_query\n", - " raise DockerError(response.status, json.loads(what.decode(\"utf8\")))\n", - "aiodocker.exceptions.DockerError: DockerError(404, 'No such image: hello-world2:latest')\n", - "\n", - "During handling of the above exception, another exception occurred:\n", - "\n", - "Traceback (most recent call last):\n", - " File \"\", line 3, in docker_testing\n", - " container = await docker.containers.run({ 'Image': 'hello-world2' })\n", - " File \"/home/justin/.local/lib/python3.8/site-packages/aiodocker/containers.py\", line 71, in run\n", - " await self.docker.pull(config[\"Image\"])\n", - " File \"/home/justin/.local/lib/python3.8/site-packages/aiodocker/images.py\", line 133, in _handle_list\n", - " async with cm as response:\n", - " File \"/home/justin/.local/lib/python3.8/site-packages/aiodocker/utils.py\", line 309, in __aenter__\n", - " resp = await self._coro\n", - " File \"/home/justin/.local/lib/python3.8/site-packages/aiodocker/docker.py\", line 268, in _do_query\n", - " raise DockerError(response.status, json.loads(what.decode(\"utf8\")))\n", - "aiodocker.exceptions.DockerError: DockerError(404, \"pull access denied for hello-world2, repository does not exist or may require 'docker login': denied: requested access to the resource is denied\")\n" - ] - } - ], - "source": [ - "async def docker_testing():\n", - " docker = aiodocker.Docker()\n", - " container = await docker.containers.run({ 'Image': 'hello-world2' })\n", - " await container.start()\n", - "# logs = await container.log(stdout=True, follow=True)\n", - " async for log in container.log(stdout=True, follow=True):\n", - " print(log.strip().strip('\\n'))\n", - " await container.delete(force=True)\n", - " await docker.close()\n", - "\n", - "task = asyncio.create_task(docker_testing())" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.8.2" - } - }, - "nbformat": 4, - "nbformat_minor": 4 -} diff --git a/examples/aritfacts.py b/examples/aritfacts.py new file mode 100644 index 0000000..115ff77 --- /dev/null +++ b/examples/aritfacts.py @@ -0,0 +1,12 @@ +from datatorch import Artifact +import logging + + +# logging.basicConfig(level=logging.DEBUG) + + +artifact = Artifact("test/test/test") +artifact.add("./examples/**/*") +# artifact.add("./README.md") +# artifact.add("./pytest.ini") +artifact.commit(message="add readme file") diff --git a/examples/artifacts_test.ipynb b/examples/artifacts_test.ipynb new file mode 100644 index 0000000..a3fce32 --- /dev/null +++ b/examples/artifacts_test.ipynb @@ -0,0 +1,386 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%load_ext autoreload\n", + "%autoreload 2" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + " %reload_ext autoreload\n", + "from datatorch.artifacts import Artifact\n", + "\n", + "artifact = Artifact(\"jsbroks/text\", version=\"latest\")\n", + "artifact.add(\"/tmp\")\n", + "artifact.commit()\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "artifact = Artifact(\"text\")\n", + "commit = artifact.new_commit()\n", + "commit.add(\"/tmp\")\n", + "commit.push()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "ip" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import IPython as ip" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "ip.get_ipython().events" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import requests\n", + "from tqdm import trange\n", + "import time\n", + "url = 'https://datatorchartifactstest.blob.core.windows.net/test/artifacts/v1/e84610e7-2a0b-4cd6-aa34-cf9e9afbec82/5210e732-54a0-4b70-a683-d15642d3da2d?sv=2020-02-10&st=2020-11-29T01%3A40%3A18Z&se=2020-11-30T01%3A40%3A18Z&sr=b&sp=racwd&sig=70kvKamQ27sdewLsl1D%2Bt6x2j%2B%2BclrhYQ6OYDGLjbd4%3D'" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "for i in trange(15):\n", + " time.sleep(0.1)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%reload_ext autoreload\n", + "from datatorch.core.upload import stats\n", + "\n", + "\n", + "def callback(size, _):\n", + " pass\n", + " \n", + "fp = open('../tmp.migrations.avro', 'rb')\n", + "data = stats.ProgressBufferedReader(fp, callback)\n", + "print(data.fileno())\n", + "headers = {\n", + " 'x-ms-blob-type': 'BlockBlob'\n", + "}\n", + "r = requests.put(url, data=data, headers=headers)\n", + "\n", + "print(r.request.headers)\n", + "print(r.text)\n", + "data.close()\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(r.text)\n", + "print(r.headers)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%load_ext autoreload\n", + "%autoreload 2\n", + "import requests\n", + "from tqdm import trange\n", + "import time\n", + "\n", + "url = 'https://datatorchartifactstest.blob.core.windows.net/test/fi1.avro?sv=2019-12-12&ss=bfqt&srt=o&sp=rwdlacupx&se=2020-12-01T02:00:28Z&st=2020-11-28T18:00:28Z&spr=https&sig=gYm1SCWo0dnMv3dtkaSYc4%2FhW%2F9wk7CWgJWYPIfRgdg%3D'\n", + "url2 = 'https://datatorchartifactstest.blob.core.windows.net/test/fi2.avro?sv=2019-12-12&ss=bfqt&srt=o&sp=rwdlacupx&se=2020-12-01T02:00:28Z&st=2020-11-28T18:00:28Z&spr=https&sig=gYm1SCWo0dnMv3dtkaSYc4%2FhW%2F9wk7CWgJWYPIfRgdg%3D'\n", + "url3 = 'https://datatorchartifactstest.blob.core.windows.net/test/fi3.avro?sv=2019-12-12&ss=bfqt&srt=o&sp=rwdlacupx&se=2020-12-01T02:00:28Z&st=2020-11-28T18:00:28Z&spr=https&sig=gYm1SCWo0dnMv3dtkaSYc4%2FhW%2F9wk7CWgJWYPIfRgdg%3D'" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%reload_ext autoreload\n", + "from datatorch.core import upload\n", + "from pathlib import Path\n", + "import logging\n", + "import time\n", + "\n", + "commit_id = '5210e732-54a0-4b70-a683-d15642d3da2d\n", + "\n", + "event = upload.PutUploadEvent(Path('../test.avro'), f'http://localhost:4000/api/file/v2/commit/{commit_id}/file/{md5}')\n", + "upload.get_upload_pool().enqueue(event)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%reload_ext autoreload\n", + "from datatorch.core.upload import stats\n", + "\n", + "with open('../migrations.avro', 'rb') as fp:\n", + " file = stats.ProgressReaderIO(fp)\n", + " file.read()\n", + " fp.read()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from tqdm import tqdm\n", + "import time\n", + "\n", + "with tqdm(\n", + " desc=\"Uploading files\",\n", + " total=1000000,\n", + " unit='B',\n", + " unit_scale=True\n", + ") as pbar:\n", + " for i in range(1000000):\n", + " pbar.update(i/1000/500)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import requests\n", + "from datatorch.core.upload.stats import get_upload_stats\n", + "\n", + "from http.client import HTTPResponse\n", + "HTTPResponse.debuglevel = 1\n", + "\n", + "artifact_id = 'e84610e7-2a0b-4cd6-aa34-cf9e9afbec82'\n", + "commit_id = '5210e732-54a0-4b70-a683-d15642d3da2d'\n", + "md5 = '098f6bcd4621d373cade4e832623b8f6'\n", + "\n", + "fp = open('../test.avro', 'rb')\n", + "\n", + "headers = {\n", + " 'x-ms-blob-type': 'BlockBlob'\n", + "}\n", + "import logging\n", + "logging.basicConfig(level=logging.DEBUG)\n", + "\n", + "pbuff = get_upload_stats().add('test', fp)\n", + "r = requests.put(\n", + " f'http://localhost:4000/api/file/v2/artifact/{artifact_id}/file/{md5}', \n", + " data=pbuff, \n", + " headers=headers, \n", + " allow_redirects=True,\n", + " stream=True\n", + ")\n", + "pbuff.close()\n", + "print(r.status_code)\n", + "print(r.text)" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "%load_ext autoreload\n", + "%autoreload 2\n", + "import requests\n", + "from tqdm import trange\n", + "import time" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Uploading files: 100%|██████████| 2.06k/2.06k [00:00<00:00, 2.68kB/s]\n" + ] + } + ], + "source": [ + "# %reload_ext autoreload\n", + "from datatorch import uploader\n", + "from pathlib import Path\n", + "\n", + "\n", + "commit_id = '5210e732-54a0-4b70-a683-d15642d3da2d'\n", + "artifact_id = 'e84610e7-2a0b-4cd6-aa34-cf9e9afbec82'\n", + "md5 = '098f6bcd4621d373cade4e832623b2f0'\n", + "\n", + "\n", + "event = uploader.CommitManifestUploadEvent(Path('../manifest.avro'), commit_id)\n", + "uploader.get_upload_pool().enqueue(event)" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [], + "source": [ + "from datatorch.core import user_settings\n", + "from datatorch.artifacts.api import ArtifactsApi" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "'http://localhost:4000/api'" + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "user_settings.api_url" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "PosixPath('/home/desktop/.config/datatorch/artifacts/commits/5210e732-54a0-4b70-a683-d15642d3da2d/manifest.avro')" + ] + }, + "execution_count": 8, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": 15, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "('remote-file-71752ccfe2464e99/1d68001ef112fa197e0de1f2cbc5ab57ebf464a5ae676667c4e51b82762627a7/01760a8a758e-0x8D88C2DEDC989B2', {'size': 757, 'hash': b'\\xe4\\xbb\\xd3lbj\\xc5o\\xaf^\\xef\\xcd\\xcd\\xdc\\x03;', 'lastModified': 1606494592.0})\n", + "('remote-file-71752ccfe2464e99/1d68001ef112fa197e0de1f2cbc5ab57ebf464a5ae676667c4e51b82762627a7/01760ac165a3-0x8D88C2DEDC989B2', {'size': 757, 'hash': b'\\xe4\\xbb\\xd3lbj\\xc5o\\xaf^\\xef\\xcd\\xcd\\xdc\\x03;', 'lastModified': 1606498176.0})\n", + "('com.microsoft.teams.linux Crashes/session-state.dat', {'size': 288, 'hash': b'\\x15~Pyki\\xc7\\xac\\x0f\\xb9T,T\\xee\\x1ef', 'lastModified': 1606487424.0})\n", + "('VSFeedbackVSRTCLogs/20201127_143020_16064874207170_VSLSAudio.log', {'size': 324, 'hash': b'<\\xf9\\xd5\\xff>\\xfbD\\x03:\\xf8\\xae5=\\x91\\x0cp', 'lastModified': 1606487424.0})\n", + "('VSFeedbackVSRTCLogs/20201127_143015_Agent.log', {'size': 65793, 'hash': b'!7\\x7ff\\x01\\xdf\\xa3\\x818!\\xd7D\\xd5\\x7f\\x97R', 'lastModified': 1606499456.0})\n", + "('VSFeedbackVSRTCLogs/20201127_143016_16064874162100_VSCode.log', {'size': 16780, 'hash': b'/$\\x12\\xdc.\\xebv\" \\x0cGx]\\x90\\xad\\x08', 'lastModified': 1606487424.0})\n", + "('VSFeedbackVSRTCLogs/20201127_143018_16064874189070_VSLSAudio.log', {'size': 183, 'hash': b']\\x1eA=W\\x9b\\xdcKP\\xf9\\x92\\xc6YF\\xa4\\x1d', 'lastModified': 1606487424.0})\n", + "('VSFeedbackVSRTCLogs/20201127_143020_16064874204560_VSLSAudio.log', {'size': 183, 'hash': b'=5B\\xdfb\\xea\\xcbW\\x0481H\\x06oI{', 'lastModified': 1606487424.0})\n", + "('VSFeedbackVSRTCLogs/20201127_143013_16064874139020_VSCode.log', {'size': 135220, 'hash': b'\\xd1x\\xd4\\xdc\\xd4\\x03x\\x15\\xbf\\x80gY\\x12F\\x1d\\x15', 'lastModified': 1606499456.0})\n", + "('VSFeedbackVSRTCLogs/20201127_143021_16064874213960_VSLSAudio.log', {'size': 183, 'hash': b'(\\xe9d=\\x7f\\xdc\\xfc\\xe2\\xe4\\x06\\xceB\\xdd\\x19\\x88\\x0f', 'lastModified': 1606487424.0})\n", + "('VSFeedbackVSRTCLogs/20201127_143013_16064874133660_VSCode.log', {'size': 19664, 'hash': b'\\xca\\xb7\\xebs\\x86P\\x9b\\xb8\\xa3N\\x0309\\xbd\\xa7\\xf8', 'lastModified': 1606490624.0})\n", + "('VSFeedbackVSRTCLogs/20201127_143013_16064874135450_VSCode.log', {'size': 15811, 'hash': b'\\xa0\\xfcO6\\xfe\\r}\\x9e\\xb8\\x99\\x00\\xff\\xe2q?\\xed', 'lastModified': 1606490624.0})\n", + "('.org.chromium.Chromium.UL5dN5/Microsoft Teams - Preview1_3.png', {'size': 1714, 'hash': b'0O\\x80\\xa7%qCg\\xc5\\x06Y\\x1d\\xcf\\xbaf\\xca', 'lastModified': 1606487424.0})\n", + "('pyright-9712-ckeJj0dqsvPn/builtins-9712-gH8XbpYyLX9y-.py', {'size': 216014, 'hash': b'\\xb0ev\\x02\\xdf5\\xb9\\xe9\\x8d\\xe3\\xc3\\xe5\\x08c\\x1e\\x17', 'lastModified': 1606488064.0})\n", + "('.com.google.Chrome.hM2k2X/status_icon_0.png', {'size': 7291, 'hash': b'\\xa0\\xc9\\x93\\x88\\xc0\\xd6\\xf3\\xa3j~\\xccTKs v', 'lastModified': 1606487424.0})\n", + "('config-err-4tyKRQ', {'size': 0, 'hash': b'\\xd4\\x1d\\x8c\\xd9\\x8f\\x00\\xb2\\x04\\xe9\\x80\\t\\x98\\xec\\xf8B~', 'lastModified': 1606487424.0})\n", + "('.X0-lock', {'size': 11, 'hash': b'z\\xba\\xf1\\xad\\x12\\x83G\\xee\\xbc\\x01\\xaa\\xad\\xfb\\xaf\\x95\\x8f', 'lastModified': 1606487296.0})\n" + ] + } + ], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.5" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/examples/import-coco.py b/examples/import-coco.py deleted file mode 100644 index 7889c35..0000000 --- a/examples/import-coco.py +++ /dev/null @@ -1,135 +0,0 @@ -""" -This script imports COCO formated annotations into DataTorch -""" -import numpy as np - -from pycocotools.coco import COCO -from datatorch.api import ( - ApiClient, - Where, - Label, - Annotation, - Segmentations, - BoundingBox, -) - - -def print_project(project): - print("=" * 50) - print(f"Project: {project.name}") - names = [label.name for label in labels] - print(f'Labels: {" ".join(names)}') - print("=" * 50) - - -if __name__ == "__main__": - - # ----- Settings ----- - - # DataTorch project ID - # You can obtain this by going into the settings view of the project - project_id = "DATATORCH_PROJECT_ID" - - # Path to annotation file - anno_file = "path/to/coco" - - # Only add annotations above this score will be imported. This is done by - # check for the property 'score' on each annotation to determine if it - # should be imported - min_score = 0.8 - - # If both import options are enabled the script will create two sources per - # annotations, one as a segmentation the other as a bounding box. - - # Import bounding boxes from the coco format - import_bbox = False - - # Import segmentations from the coco format - import_segmentation = True - - # ----- Script ----- - - # Connect to DataTorch - api = ApiClient() - project = api.project(project_id) - labels = project.labels() - - print_project(project) - - def category_in_project(name: str) -> Label: - """ Returns category in project """ - for cat in labels: - if cat.name.lower() == name.lower(): - return cat - return None - - # Load coco categories - coco = COCO(anno_file) - cats = coco.loadCats(coco.getCatIds()) - names = [cat["name"] for cat in cats] - - label_maping = {} - for cat in cats: - name = cat["name"] - found = category_in_project(name) - - if not found: - print(f'label "{name}" not found in project') - else: - label_maping[cat["id"]] = found - - print(f'COCO Categories: {" ".join(names)}') - - for img_id in coco.getImgIds(): - (img,) = coco.loadImgs(img_id) - name = img["file_name"] - - find_by_name = Where(name=name, mimetype__starts_with="image") - dt_files = project.files(find_by_name) - files_count = len(dt_files) - - if files_count > 1: - print(f"\nMultiple files found with name {name}, skipping") - continue - - if files_count == 0: - print(f"\nNo files found with name {name}, skipping") - continue - - print(f"\n{name} found. Importing annotations") - dt_file = dt_files[0] - - # load file annotations - anno_ids = coco.getAnnIds(imgIds=img["id"]) - annos = coco.loadAnns(anno_ids) - - for anno in annos: - # Create annotation - if anno.get("datatorch_id") is not None: - print(f'Annotation {anno["id"]} already exists in DataTorch, skipping') - - score = anno.get("score", 1) - if score < min_score: - continue - - label = label_maping[anno["category_id"]] - if label is None: - continue - - dt_anno = Annotation(label=label) - if import_bbox: - bbox = BoundingBox.xywh(*anno["bbox"]) - dt_anno.add(bbox) - - if import_segmentation: - polygons = anno["segmentation"] - # Format segmentation to the correct DataTorch format which is: - # [ - # [ [x1,y1], [x2,y2], ... ], - # [ [x1,y1], [x2,y2], ... ] - # ] - path_data = [np.reshape(polygon, (-1, 2)) for polygon in polygons] - segmentation = Segmentations(path_data=path_data) - dt_anno.add(segmentation) - - dt_file.add(dt_anno) diff --git a/examples/notebooks/artifact.ipynb b/examples/notebooks/artifact.ipynb new file mode 100644 index 0000000..a0d2674 --- /dev/null +++ b/examples/notebooks/artifact.ipynb @@ -0,0 +1,109 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "The autoreload extension is already loaded. To reload it, use:\n", + " %reload_ext autoreload\n" + ] + } + ], + "source": [ + "%load_ext autoreload\n", + "%autoreload 2" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [], + "source": [ + "from datatorch import Artifact" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The artifacts class is used to create, add files, delete, or download files from an artifact" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "artifact = Artifact(\"test/test/test\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We can get an existing artifact by passing in the artifacts name. If no version is specified for the artifact will automa select the `latest` artifact commit.\n", + "\n", + "Here are some examples:\n", + "```python\n", + "# Gets latest version of the Artifact (last commit)\n", + "Artifact(\"username/project/name\") # tag=latest\n", + "\n", + "# Get V2 of Artifact.\n", + "Artifact(\"username/project/name\", tag=\"v2\")\n", + "```" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "adding file: /home/desktop/Git/python/examples/data/coco-test.json => coco-test.json\n" + ] + } + ], + "source": [ + "artifact.add(\"../data\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.5" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/examples/testing.py b/examples/testing.py deleted file mode 100644 index 409f7c6..0000000 --- a/examples/testing.py +++ /dev/null @@ -1,20 +0,0 @@ -from datatorch.api import ApiClient, Annotation, BoundingBox, Where - -client = ApiClient( - api_key="fa2c325a-fd78-4bc6-827f-90242530bebd", api_url="http://localhost:4000" -) - -project = client.project("68ca53cc-5820-4c01-9bf3-abc9e384fff4") -labels = project.labels() - -files = project.files(where=Where(path__starts_with="dataset")) -print(len(files)) - -# f = client.file("7b1e2d05-89ed-40fe-b2aa-9368633b8748") - -# anno = Annotation(label=labels[0]) -# anno.add(BoundingBox.create(0, 0, 100, 100)) - -# f.add(anno) -# print(anno.__dict__) -# print(f.__dict__) diff --git a/setup.py b/setup.py index 3c29d17..b905f5e 100644 --- a/setup.py +++ b/setup.py @@ -18,6 +18,7 @@ "Jinja2~=2.0", "PyYAML~=5.0", "aiostream~=0.4.0", + "fastavro~=1.2", ] requirements_agents = []