Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions datadog_sync/commands/shared/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,86 @@ def click_config_file_provider(ctx: Context, opts: CustomOptionClass, value: Non
help=f"AWS session token, only used if --storage-type is '{constants.S3_STORAGE_TYPE}'",
cls=CustomOptionClass,
),
# GCS options
option(
"--gcs-bucket-name",
envvar=constants.GCS_BUCKET_NAME,
required=False,
help=f"GCS bucket name, only used if --storage-type is '{constants.GCS_STORAGE_TYPE}'",
cls=CustomOptionClass,
),
option(
"--gcs-bucket-key-prefix-source",
default=constants.SOURCE_PATH_DEFAULT,
show_default=True,
envvar=constants.GCS_BUCKET_KEY_PREFIX_SOURCE,
required=False,
help=f"GCS bucket source key prefix, only used if --storage-type is '{constants.GCS_STORAGE_TYPE}'",
cls=CustomOptionClass,
),
option(
"--gcs-bucket-key-prefix-destination",
default=constants.DESTINATION_PATH_DEFAULT,
show_default=True,
envvar=constants.GCS_BUCKET_KEY_PREFIX_DESTINATION,
required=False,
help=f"GCS bucket destination key prefix, only used if --storage-type is '{constants.GCS_STORAGE_TYPE}'",
cls=CustomOptionClass,
),
option(
"--gcs-service-account-key-file",
envvar=constants.GCS_SERVICE_ACCOUNT_KEY_FILE,
required=False,
help=f"Path to GCS service account key file, only used if --storage-type is '{constants.GCS_STORAGE_TYPE}'",
cls=CustomOptionClass,
),
# Azure options
option(
"--azure-container-name",
envvar=constants.AZURE_CONTAINER_NAME,
required=False,
help=f"Azure container name, only used if --storage-type is '{constants.AZURE_STORAGE_TYPE}'",
cls=CustomOptionClass,
),
option(
"--azure-container-key-prefix-source",
default=constants.SOURCE_PATH_DEFAULT,
show_default=True,
envvar=constants.AZURE_CONTAINER_KEY_PREFIX_SOURCE,
required=False,
help=f"Azure container source key prefix, only used if --storage-type is '{constants.AZURE_STORAGE_TYPE}'",
cls=CustomOptionClass,
),
option(
"--azure-container-key-prefix-destination",
default=constants.DESTINATION_PATH_DEFAULT,
show_default=True,
envvar=constants.AZURE_CONTAINER_KEY_PREFIX_DESTINATION,
required=False,
help=f"Azure container destination key prefix, only used if --storage-type is '{constants.AZURE_STORAGE_TYPE}'",
cls=CustomOptionClass,
),
option(
"--azure-storage-account-name",
envvar=constants.AZURE_STORAGE_ACCOUNT_NAME,
required=False,
help=f"Azure storage account name, only used if --storage-type is '{constants.AZURE_STORAGE_TYPE}'",
cls=CustomOptionClass,
),
option(
"--azure-storage-account-key",
envvar=constants.AZURE_STORAGE_ACCOUNT_KEY,
required=False,
help=f"Azure storage account key, only used if --storage-type is '{constants.AZURE_STORAGE_TYPE}'",
cls=CustomOptionClass,
),
option(
"--azure-storage-connection-string",
envvar=constants.AZURE_STORAGE_CONNECTION_STRING,
required=False,
help=f"Azure storage connection string, only used if --storage-type is '{constants.AZURE_STORAGE_TYPE}'",
cls=CustomOptionClass,
),
]


Expand Down
28 changes: 28 additions & 0 deletions datadog_sync/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,13 @@

LOCAL_STORAGE_TYPE = "local"
S3_STORAGE_TYPE = "s3"
GCS_STORAGE_TYPE = "gcs"
AZURE_STORAGE_TYPE = "azure"
STORAGE_TYPES = [
LOCAL_STORAGE_TYPE,
S3_STORAGE_TYPE,
GCS_STORAGE_TYPE,
AZURE_STORAGE_TYPE,
]

DD_DESTINATION_RESOURCES_PATH = "DD_DESTINATION_RESOURCES_PATH"
Expand All @@ -53,6 +57,30 @@
"aws_session_token",
]

# GCS env parameter names
GCS_BUCKET_NAME = "GCS_BUCKET_NAME"
GCS_BUCKET_KEY_PREFIX_SOURCE = "GCS_BUCKET_KEY_PREFIX_SOURCE"
GCS_BUCKET_KEY_PREFIX_DESTINATION = "GCS_BUCKET_KEY_PREFIX_DESTINATION"
GCS_SERVICE_ACCOUNT_KEY_FILE = "GCS_SERVICE_ACCOUNT_KEY_FILE"
GCS_CONFIG_PROPERTIES = [
"gcs_bucket_name",
"gcs_service_account_key_file",
]

# Azure env parameter names
AZURE_CONTAINER_NAME = "AZURE_CONTAINER_NAME"
AZURE_CONTAINER_KEY_PREFIX_SOURCE = "AZURE_CONTAINER_KEY_PREFIX_SOURCE"
AZURE_CONTAINER_KEY_PREFIX_DESTINATION = "AZURE_CONTAINER_KEY_PREFIX_DESTINATION"
AZURE_STORAGE_ACCOUNT_NAME = "AZURE_STORAGE_ACCOUNT_NAME"
AZURE_STORAGE_ACCOUNT_KEY = "AZURE_STORAGE_ACCOUNT_KEY"
AZURE_STORAGE_CONNECTION_STRING = "AZURE_STORAGE_CONNECTION_STRING"
AZURE_CONFIG_PROPERTIES = [
"azure_container_name",
"azure_storage_account_name",
"azure_storage_account_key",
"azure_storage_connection_string",
]

# Default variables
DEFAULT_API_URL = "https://api.datadoghq.com"

Expand Down
36 changes: 36 additions & 0 deletions datadog_sync/utils/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,14 @@
from datadog_sync.constants import (
Command,
AWS_CONFIG_PROPERTIES,
AZURE_CONFIG_PROPERTIES,
AZURE_STORAGE_TYPE,
DESTINATION_PATH_DEFAULT,
DESTINATION_PATH_PARAM,
FALSE,
FORCE,
GCS_CONFIG_PROPERTIES,
GCS_STORAGE_TYPE,
LOCAL_STORAGE_TYPE,
LOGGER_NAME,
RESOURCE_PER_FILE,
Expand Down Expand Up @@ -230,6 +234,38 @@ def build_config(cmd: Command, **kwargs: Optional[Any]) -> Configuration:
if not property_value:
logger.warning(f"Missing AWS configuration parameter: {aws_config_property}")
config[aws_config_property] = property_value
elif storage_type == GCS_STORAGE_TYPE:
logger.info("Using GCS to store state files")
storage_type = StorageType.GCS_BUCKET

local_source_resources_path = kwargs.get(SOURCE_PATH_PARAM, SOURCE_PATH_DEFAULT)
source_resources_path = kwargs.get("gcs_bucket_key_prefix_source", local_source_resources_path)

local_destination_resources_path = kwargs.get(DESTINATION_PATH_PARAM, DESTINATION_PATH_DEFAULT)
destination_resources_path = kwargs.get("gcs_bucket_key_prefix_destination", local_destination_resources_path)

for gcs_config_property in GCS_CONFIG_PROPERTIES:
property_value = kwargs.get(gcs_config_property, None)
if not property_value:
logger.warning(f"Missing GCS configuration parameter: {gcs_config_property}")
config[gcs_config_property] = property_value
elif storage_type == AZURE_STORAGE_TYPE:
logger.info("Using Azure Blob Storage to store state files")
storage_type = StorageType.AZURE_BLOB_CONTAINER

local_source_resources_path = kwargs.get(SOURCE_PATH_PARAM, SOURCE_PATH_DEFAULT)
source_resources_path = kwargs.get("azure_container_key_prefix_source", local_source_resources_path)

local_destination_resources_path = kwargs.get(DESTINATION_PATH_PARAM, DESTINATION_PATH_DEFAULT)
destination_resources_path = kwargs.get(
"azure_container_key_prefix_destination", local_destination_resources_path
)

for azure_config_property in AZURE_CONFIG_PROPERTIES:
property_value = kwargs.get(azure_config_property, None)
if not property_value:
logger.warning(f"Missing Azure configuration parameter: {azure_config_property}")
config[azure_config_property] = property_value
elif storage_type == LOCAL_STORAGE_TYPE:
logger.info("Using local filesystem to store state files")
storage_type = StorageType.LOCAL_FILE
Expand Down
22 changes: 22 additions & 0 deletions datadog_sync/utils/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
)
from datadog_sync.utils.storage._base_storage import BaseStorage, StorageData
from datadog_sync.utils.storage.aws_s3_bucket import AWSS3Bucket
from datadog_sync.utils.storage.azure_blob_container import AzureBlobContainer
from datadog_sync.utils.storage.gcs_bucket import GCSBucket
from datadog_sync.utils.storage.local_file import LocalFile
from datadog_sync.utils.storage.storage_types import StorageType

Expand All @@ -39,6 +41,26 @@ def __init__(self, type_: StorageType = StorageType.LOCAL_FILE, **kwargs: object
config=config,
resource_per_file=resource_per_file,
)
elif type_ == StorageType.GCS_BUCKET:
config = kwargs.get("config", {})
if not config:
raise ValueError("GCS configuration not found")
self._storage: BaseStorage = GCSBucket(
source_resources_path=source_resources_path,
destination_resources_path=destination_resources_path,
config=config,
resource_per_file=resource_per_file,
)
elif type_ == StorageType.AZURE_BLOB_CONTAINER:
config = kwargs.get("config", {})
if not config:
raise ValueError("Azure configuration not found")
self._storage: BaseStorage = AzureBlobContainer(
source_resources_path=source_resources_path,
destination_resources_path=destination_resources_path,
config=config,
resource_per_file=resource_per_file,
)
else:
raise NotImplementedError(f"Storage type {type_} not implemented")

Expand Down
2 changes: 2 additions & 0 deletions datadog_sync/utils/storage/aws_s3_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ def __init__(
self.client = boto3.client("s3")

self.bucket_name = config.get("aws_bucket_name", "")
if not self.bucket_name:
raise ValueError("AWS S3 bucket name is required")

def get(self, origin: Origin) -> StorageData:
log.info("AWS S3 get called")
Expand Down
115 changes: 115 additions & 0 deletions datadog_sync/utils/storage/azure_blob_container.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# Unless explicitly stated otherwise all files in this repository are licensed
# under the 3-clause BSD style license (see LICENSE).
# This product includes software developed at Datadog (https://www.datadoghq.com/).
# Copyright 2019 Datadog, Inc.

import json
import logging

from azure.storage.blob import BlobServiceClient, ContainerClient
from azure.identity import DefaultAzureCredential

from datadog_sync.constants import (
Origin,
DESTINATION_PATH_DEFAULT,
LOGGER_NAME,
SOURCE_PATH_DEFAULT,
)
from datadog_sync.utils.storage._base_storage import BaseStorage, StorageData


log = logging.getLogger(LOGGER_NAME)


class AzureBlobContainer(BaseStorage):

def __init__(
self,
source_resources_path=SOURCE_PATH_DEFAULT,
destination_resources_path=DESTINATION_PATH_DEFAULT,
resource_per_file=False,
config=None,
) -> None:
log.info("Azure Blob Storage init called")
super().__init__()
self.source_resources_path = source_resources_path
self.destination_resources_path = destination_resources_path
self.resource_per_file = resource_per_file
if not config:
raise ValueError("No Azure configuration passed in")

container_name = config.get("azure_container_name", "")
if not container_name:
raise ValueError("Azure container name is required")
connection_string = config.get("azure_storage_connection_string", None)
account_name = config.get("azure_storage_account_name", None)
account_key = config.get("azure_storage_account_key", None)

if connection_string:
log.info("Azure Blob Storage configured with connection string")
self.container_client = ContainerClient.from_connection_string(
conn_str=connection_string,
container_name=container_name,
)
elif account_name and account_key:
log.info("Azure Blob Storage configured with account name and key")
account_url = f"https://{account_name}.blob.core.windows.net"
blob_service_client = BlobServiceClient(account_url=account_url, credential=account_key)
self.container_client = blob_service_client.get_container_client(container_name)
elif account_name:
log.info("Azure Blob Storage configured with default credentials")
account_url = f"https://{account_name}.blob.core.windows.net"
blob_service_client = BlobServiceClient(account_url=account_url, credential=DefaultAzureCredential())
self.container_client = blob_service_client.get_container_client(container_name)
else:
raise ValueError("Azure storage requires at least a connection string or storage account name")

def get(self, origin: Origin) -> StorageData:
log.info("Azure Blob Storage get called")
data = StorageData()

if origin in [Origin.SOURCE, Origin.ALL]:
for blob in self.container_client.list_blobs(name_starts_with=self.source_resources_path):
if blob.name.endswith(".json"):
resource_type = blob.name.split(".")[0].split("/")[-1]
try:
content = self.container_client.download_blob(blob.name).readall().decode("utf-8")
data.source[resource_type].update(json.loads(content))
except json.decoder.JSONDecodeError:
log.warning(f"invalid json in azure source resource file: {resource_type}")

if origin in [Origin.DESTINATION, Origin.ALL]:
for blob in self.container_client.list_blobs(name_starts_with=self.destination_resources_path):
if blob.name.endswith(".json"):
resource_type = blob.name.split(".")[0].split("/")[-1]
try:
content = self.container_client.download_blob(blob.name).readall().decode("utf-8")
data.destination[resource_type].update(json.loads(content))
except json.decoder.JSONDecodeError:
log.warning(f"invalid json in azure destination resource file: {resource_type}")

return data

def put(self, origin: Origin, data: StorageData) -> None:
log.info("Azure Blob Storage put called")
if origin in [Origin.SOURCE, Origin.ALL]:
for resource_type, resource_data in data.source.items():
base_key = f"{self.source_resources_path}/{resource_type}"
if self.resource_per_file:
for _id, resource in resource_data.items():
key = f"{base_key}.{_id}.json"
self.container_client.upload_blob(name=key, data=json.dumps({_id: resource}), overwrite=True)
else:
key = f"{base_key}.json"
self.container_client.upload_blob(name=key, data=json.dumps(resource_data), overwrite=True)

if origin in [Origin.DESTINATION, Origin.ALL]:
for resource_type, resource_data in data.destination.items():
base_key = f"{self.destination_resources_path}/{resource_type}"
if self.resource_per_file:
for _id, resource in resource_data.items():
key = f"{base_key}.{_id}.json"
self.container_client.upload_blob(name=key, data=json.dumps({_id: resource}), overwrite=True)
else:
key = f"{base_key}.json"
self.container_client.upload_blob(name=key, data=json.dumps(resource_data), overwrite=True)
Loading