diff --git a/import-automation/executor/requirements.txt b/import-automation/executor/requirements.txt index 9ae77bed1a..43a51fe0c5 100644 --- a/import-automation/executor/requirements.txt +++ b/import-automation/executor/requirements.txt @@ -7,7 +7,6 @@ chardet chromedriver_py croniter dataclasses -datacommons datacommons_client db-dtypes duckdb diff --git a/scripts/earthengine/process_events.py b/scripts/earthengine/process_events.py index dbd2671a31..2d6e66d8c4 100644 --- a/scripts/earthengine/process_events.py +++ b/scripts/earthengine/process_events.py @@ -21,7 +21,6 @@ import pickle import sys import time -import datacommons as dc from absl import app from absl import flags @@ -74,7 +73,7 @@ from counters import Counters from latlng_recon_geojson import LatLng2Places from config_map import ConfigMap -from dc_api_wrapper import dc_api_batched_wrapper +from dc_api_wrapper import dc_api_get_property # List of place types in increasing order of preference for name. # This is used to pick the name of the place from the list of affectedPlaces @@ -699,13 +698,11 @@ def prefetch_placeid_property(self, prop: str, place_ids: list = None): if lookup_places: start_time = time.perf_counter() - cache_dict.update( - dc_api_batched_wrapper(function=dc.get_property_values, - dcids=lookup_places, - args={ - 'prop': prop, - }, - config=self._config)) + place_props = dc_api_get_property(lookup_places, prop) + for placeid, prop_value in place_props.items(): + value = prop_value.get(prop) + if value: + cache_dict[placeid] = value end_time = time.perf_counter() self._counters.add_counter(f'dc_api_lookup_{prop}_count', len(lookup_places)) diff --git a/scripts/earthengine/utils.py b/scripts/earthengine/utils.py index 2f014c7dd2..21b3def784 100644 --- a/scripts/earthengine/utils.py +++ b/scripts/earthengine/utils.py @@ -26,7 +26,6 @@ from typing import Union from absl import logging -import datacommons as dc from dateutil.relativedelta import relativedelta from geopy import distance import s2sphere @@ -41,7 +40,8 @@ os.path.join(os.path.dirname(os.path.dirname(_SCRIPTS_DIR)), 'util')) from config_map import ConfigMap, read_py_dict_from_file, write_py_dict_to_file -from dc_api_wrapper import dc_api_wrapper +from dc_api_wrapper import dc_api_get_node_property +from string_utils import str_to_list # Constants _MAX_LATITUDE = 90.0 @@ -368,23 +368,13 @@ def place_id_to_lat_lng(placeid: str, # Get the lat/lng from the DC API latlng = [] for prop in ['latitude', 'longitude']: - # dc.utils._API_ROOT = 'http://autopush.api.datacommons.org' - # resp = dc.get_property_values([placeid], prop) - resp = dc_api_wrapper( - function=dc.get_property_values, - args={ - 'dcids': [placeid], - 'prop': prop, - }, - use_cache=True, - api_root=_DC_API_ROOT, - ) + resp = dc_api_get_node_property([placeid], prop) if not resp or placeid not in resp: return (0, 0) - values = resp[placeid] + values = str_to_list(resp[placeid].get(prop)) if not len(values): return (0, 0) - latlng.append(float(values[0])) + latlng.append(float(values[0].strip('"'))) lat = latlng[0] lng = latlng[1] return (lat, lng) diff --git a/scripts/eurostat/health_determinants/common/dcid_existence.py b/scripts/eurostat/health_determinants/common/dcid_existence.py index f6fe6e2f17..7bd3a5deb1 100644 --- a/scripts/eurostat/health_determinants/common/dcid_existence.py +++ b/scripts/eurostat/health_determinants/common/dcid_existence.py @@ -14,6 +14,13 @@ """_summary_ Script to check the property/dcid/nodes existance in datacommons.org. """ +import os +import sys + +_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +sys.path.append(os.path.join(_SCRIPT_DIR, '../../../../util')) + +from dc_api_wrapper import dc_api_is_defined_dcid import datacommons @@ -30,17 +37,7 @@ def check_dcid_existence(nodes: list) -> dict: dict: Status dictionary. """ # pylint: disable=protected-access - nodes_response = datacommons.get_property_values( - nodes, - "typeOf", - out=True, - value_type=None, - limit=datacommons.utils._MAX_LIMIT) + # pylint: disable=protected-access + node_status = dc_api_is_defined_dcid(nodes) # pylint: enable=protected-access - node_status = {} - for node, value in nodes_response.items(): - if value == []: - node_status[node] = False - else: - node_status[node] = True return node_status diff --git a/scripts/fires/wfigs_data.py b/scripts/fires/wfigs_data.py index 60d21b03d9..1ce395bdfb 100644 --- a/scripts/fires/wfigs_data.py +++ b/scripts/fires/wfigs_data.py @@ -27,7 +27,6 @@ import pickle import re import requests -import datacommons as dc import sys _SCRIPT_PATH = os.path.dirname(os.path.abspath(__file__)) diff --git a/scripts/glims/rgi/process.py b/scripts/glims/rgi/process.py index 450b4abd14..557eea8c5f 100644 --- a/scripts/glims/rgi/process.py +++ b/scripts/glims/rgi/process.py @@ -18,6 +18,11 @@ import json import glob import os +import sys + +_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +sys.path.append(os.path.join(_SCRIPT_DIR, '../../../util')) +from dc_api_wrapper import dc_api_get_values from shapely import geometry from absl import app from absl import flags @@ -30,14 +35,14 @@ def _load_geojsons(): countries = dc.get_places_in(['Earth'], 'Country')['Earth'] - resp = dc.get_property_values(countries, 'geoJsonCoordinatesDP2') + resp = dc_api_get_values(countries, 'geoJsonCoordinatesDP2') geojsons = {} for p, gj in resp.items(): if not gj: continue geojsons[p] = geometry.shape(json.loads(gj[0])) print('Got', len(geojsons), 'geojsons!') - cip = dc.get_property_values(countries, 'containedInPlace') + cip = dc_api_get_values(countries, 'containedInPlace') return geojsons, cip diff --git a/scripts/noaa/gpcc_spi/create_place_to_grid_area_mapping.py b/scripts/noaa/gpcc_spi/create_place_to_grid_area_mapping.py index 19d0b2560b..bdd87c1ede 100644 --- a/scripts/noaa/gpcc_spi/create_place_to_grid_area_mapping.py +++ b/scripts/noaa/gpcc_spi/create_place_to_grid_area_mapping.py @@ -23,15 +23,22 @@ """ from shapely import geometry -import datacommons as dc import concurrent.futures from typing import List, Optional import json import csv +import sys +import os from absl import flags from absl import app +_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +sys.path.append(_SCRIPT_DIR) +sys.path.append(os.path.join(_SCRIPT_DIR.split('/data/', 1)[0], 'data', 'util')) + +import dc_api_wrapper as dc_api + FLAGS = flags.FLAGS flags.DEFINE_string('gpcc_spi_places_with_csv', @@ -59,20 +66,27 @@ def construct_one_degree_grid_polygons() -> List[geometry.box]: def get_place_by_type(parent_places, places_types: List[str]) -> List[str]: """Return the place ids of all places contained in a set of parent places.""" + dc_api_client = dc_api.get_datacommons_client() all_types_of_places = [] for place_type in places_types: - parent_place_to_places = dc.get_places_in(parent_places, place_type) - for places in parent_place_to_places.values(): - for place in places: - all_types_of_places.extend(place) + parent_place_to_places = dc_api.dc_api_batched_wrapper( + dc_api_client.node.fetch_place_descendants, + parent_places, {'descendants_type': place_type}, + dcid_arg_kw='place_dcids') + for child_places in parent_place_to_places.values(): + for place in child_places: + place_dcid = place.get('dcid') + if place_dcid: + all_types_of_places.append(place_dcid) return all_types_of_places def places_to_geo_jsons(places: List[str]): """Get geojsons for a list of places.""" - resp = dc.get_property_values(places, 'geoJsonCoordinates') + resp = dc_api.dc_api_get_node_property(places, 'geoJsonCoordinates') geojsons = {} - for p, gj in resp.items(): + for p, gj_value in resp.items(): + gj = gj_value.get('geoJsonCoordinates') if not gj: continue geojsons[p] = geometry.shape(json.loads(gj[0])) diff --git a/scripts/rff/preprocess_raster.py b/scripts/rff/preprocess_raster.py index 2aff529ae7..fbdf7fa836 100644 --- a/scripts/rff/preprocess_raster.py +++ b/scripts/rff/preprocess_raster.py @@ -10,6 +10,8 @@ RFF_DIR = os.path.dirname(os.path.abspath(__file__)) sys.path.append(RFF_DIR) +sys.path.append(os.path.join(RFF_DIR, '../../util')) +from dc_api_wrapper import dc_api_get_values from rff import util bandname_to_gdcStatVars = { @@ -37,11 +39,11 @@ def get_dcid(sp_scale, lat, lon): def get_county_geoid(lat, lon): counties = dc.get_places_in(['country/USA'], 'County')['country/USA'] - counties_simp = dc.get_property_values(counties, 'geoJsonCoordinatesDP1') + counties_simp = dc_api_get_values(counties, 'geoJsonCoordinatesDP1') point = geometry.Point(lon, lat) for p, gj in counties_simp.items(): if len(gj) == 0: - gj = dc.get_property_values([p], 'geoJsonCoordinates')[p] + gj = dc_api_get_values([p], 'geoJsonCoordinates')[p] if len(gj) == 0: # property not defined for one county in alaska continue if geometry.shape(json.loads(gj[0])).contains(point): diff --git a/scripts/un/boundaries/country_boundaries.py b/scripts/un/boundaries/country_boundaries.py index 9c519e7518..00ebc75d9c 100644 --- a/scripts/un/boundaries/country_boundaries.py +++ b/scripts/un/boundaries/country_boundaries.py @@ -23,16 +23,23 @@ from typing import Dict -import datacommons as dc import geopandas as gpd from geojson_rewind import rewind import json import os +import sys import requests from absl import app from absl import flags +_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +sys.path.append(_SCRIPT_DIR) +sys.path.append(os.path.join(_SCRIPT_DIR.split('/data/', 1)[0], 'data', 'util')) + +import dc_api_wrapper as dc_api +from string_utils import str_to_list + FLAGS = flags.FLAGS flags.DEFINE_string('input_file', 'data/UNGIS_BNDA.geojson', 'Input geojson file') @@ -194,10 +201,10 @@ def existing_codes(self, all_countries): Only countries with DCID of the form county/{code} are included. """ # Call DC API to get list of countries - dc_all_countries = dc.get_property_values(['Country'], - 'typeOf', - out=False, - limit=500)['Country'] + dc_all_countries = str_to_list( + dc_api.dc_api_get_property(['Country'], 'typeOf', + out=False).get('Country', + {}).get('typeOf', '')) dc_all_countries = set(dc_all_countries) def is_dc_country(iso): @@ -257,8 +264,10 @@ def build_cache(self, existing_codes): all_children.update(children) child2name = {} - for child, values in dc.get_property_values(list(all_children), - 'name').items(): + children_names = dc_api.dc_api_get_node_property( + list(all_children), 'name') + for child, prop_values in children_names.items(): + values = str_to_list(prop_values.get('name', '')) if values: child2name[child] = values[0] diff --git a/scripts/us_census/enhanced_tmcf/process_etmcf.py b/scripts/us_census/enhanced_tmcf/process_etmcf.py index fd51dbbf2d..d08567ddc2 100644 --- a/scripts/us_census/enhanced_tmcf/process_etmcf.py +++ b/scripts/us_census/enhanced_tmcf/process_etmcf.py @@ -1,6 +1,13 @@ import csv import datacommons as dc import os +import sys + +_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +sys.path.append(os.path.join(_SCRIPT_DIR, '../../../util')) + +from dc_api_wrapper import dc_api_get_node_property +import datacommons as dc from absl import app from absl import flags @@ -70,9 +77,9 @@ def _get_places_not_found(census_geoids: List[str]) -> List[str]: for i in range(0, len(geo_ids), NUM_DCIDS_TO_QUERY): selected_geo_ids = geo_ids[i:i + NUM_DCIDS_TO_QUERY] selected_dcids = [geoId_to_dcids[g] for g in selected_geo_ids] - res = dc.get_property_values(selected_dcids, 'name') + res = dc_api_get_node_property(selected_dcids, 'name') for index in range(len(selected_dcids)): - if not res[selected_dcids[index]]: + if selected_dcids[index] not in res: geoIds_not_found.append(selected_geo_ids[index]) return geoIds_not_found diff --git a/scripts/us_census/geojsons_low_res/download.py b/scripts/us_census/geojsons_low_res/download.py index 0be1fdbd34..cbdb7359e4 100644 --- a/scripts/us_census/geojsons_low_res/download.py +++ b/scripts/us_census/geojsons_low_res/download.py @@ -20,6 +20,11 @@ import datacommons as dc import geojson import os +import sys + +_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +sys.path.append(os.path.join(_SCRIPT_DIR, '../../../util')) +from dc_api_wrapper import dc_api_node_get_property class GeojsonDownloader: @@ -88,10 +93,11 @@ def download_data(self, place='country/USA', level=1): Raises: ValueError: If a Data Commons API call fails. """ - geolevel = dc.get_property_values([place], "typeOf") + place_types = dc_api.dc_api_node_get_property([place], "typeOf") + geo_level = str_to_list(place_types.get(place, {}).get("typeOf", "")) # There is an extra level of nesting in geojson files, so we have # to get the 0th element explicitly. - assert len(geolevel[place]) == 1 + assert len(geo_level) >= 1 geolevel = geolevel[place][0] for i in range(level): @@ -99,9 +105,18 @@ def download_data(self, place='country/USA', level=1): raise ValueError("Desired level does not exist.") geolevel = self.LEVEL_MAP[geolevel] - geos_contained_in_place = dc.get_places_in([place], geolevel)[place] - self.geojsons = dc.get_property_values(geos_contained_in_place, - "geoJsonCoordinates") + dc_api_client = dc_api.get_datacommons_client() + descendant_places = dc_api.dc_api_batched_wrapper( + dc_api_client.node.fetch_place_descendants, [place], { + 'descendants_type': geolevel + }, + dcid_arg_kw='place_dcids').get(place, {}) + geos_contained_in_place = [ + place_name.get('dcid') for place_name in descendant_places + ] + + self.geojsons = dc_api_get_values(geos_contained_in_place, + "geoJsonCoordinates") for area, coords in self.iter_subareas(): self.geojsons[area][0] = geojson.loads(coords) diff --git a/scripts/us_epa/util/facilities_helper.py b/scripts/us_epa/util/facilities_helper.py index 7662c36937..75ed450046 100644 --- a/scripts/us_epa/util/facilities_helper.py +++ b/scripts/us_epa/util/facilities_helper.py @@ -15,6 +15,7 @@ import os import ssl +import sys import datacommons import json @@ -25,6 +26,13 @@ from requests.structures import CaseInsensitiveDict from requests.exceptions import HTTPError +_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +sys.path.append(_SCRIPT_DIR) +sys.path.append(os.path.join(_SCRIPT_DIR.split('/data/', 1)[0], 'data', 'util')) + +import dc_api_wrapper as dc_api +from string_utils import str_to_list + _COUNTY_CANDIDATES_CACHE = {} @@ -116,11 +124,11 @@ def get_county_candidates(zcta): return _COUNTY_CANDIDATES_CACHE[zcta] candidate_lists = [] for prop in ['containedInPlace', 'geoOverlaps']: - resp = datacommons.get_property_values([zcta], + resp = dc_api.dc_api_get_node_property([zcta], prop, out=True, - value_type='County') - candidate_lists.append(sorted(resp[zcta])) + constraints={'typeOf': 'County'}) + candidate_lists.append(sorted(str_to_list(resp.get(zcta).get(prop, '')))) _COUNTY_CANDIDATES_CACHE[zcta] = candidate_lists return candidate_lists diff --git a/scripts/us_nces/common/dcid_mcf_existance.py b/scripts/us_nces/common/dcid_mcf_existance.py index 9c3d807112..d806fdc0c8 100644 --- a/scripts/us_nces/common/dcid_mcf_existance.py +++ b/scripts/us_nces/common/dcid_mcf_existance.py @@ -14,7 +14,13 @@ """ Script to check if properties, DCIDs, or nodes exist in datacommons.org. """ -import datacommons +import os +import sys + +_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +sys.path.append(os.path.join(_SCRIPT_DIR, '../../../../util')) + +from dc_api_wrapper import dc_api_is_defined_dcid def check_dcid_existence(nodes: list) -> dict: @@ -28,17 +34,7 @@ def check_dcid_existence(nodes: list) -> dict: dict: Status dictionary. """ # pylint: disable=protected-access - nodes_response = datacommons.get_property_values( - nodes, - "typeOf", - out=True, - value_type=None, - limit=datacommons.utils._MAX_LIMIT) + # pylint: disable=protected-access + node_status = dc_api_is_defined_dcid(nodes) # pylint: enable=protected-access - node_status = {} - for node, value in nodes_response.items(): - if value == []: - node_status[node] = False - else: - node_status[node] = True return node_status diff --git a/tools/statvar_importer/place/place_resolver.py b/tools/statvar_importer/place/place_resolver.py index 330677cb0f..79f78d18b1 100644 --- a/tools/statvar_importer/place/place_resolver.py +++ b/tools/statvar_importer/place/place_resolver.py @@ -861,6 +861,7 @@ def filter_by_pvs( lookup_dcids_by_prop = {p: [] for p in lookup_props} for dcid in dcids: + place_props = places.get(dcid, {}) for prop in lookup_props: value = place_props.get(prop, '') if not value: diff --git a/util/dc_api_wrapper.py b/util/dc_api_wrapper.py index 2e084eb766..e08e4c5f62 100644 --- a/util/dc_api_wrapper.py +++ b/util/dc_api_wrapper.py @@ -18,9 +18,6 @@ DC V2 API requires an environment variable set for DC_API_KEY. Please refer to https://docs.datacommons.org/api/python/v2 for more details. - -To use the legacy datacommons library module, set the config: - 'dc_api_version': 'V1' """ from collections import OrderedDict @@ -34,7 +31,6 @@ from absl import logging from datacommons_client.client import DataCommonsClient from datacommons_client.utils.error_handling import DCConnectionError, DCStatusError, APIError -import datacommons as dc import requests_cache _SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) @@ -42,30 +38,20 @@ from download_util import request_url -# Path for reconciliation API in the dc.utils._API_ROOT -# For more details, please refer to: -# https://github.com/datacommonsorg/reconciliation#usage -# Resolve Id -# https://api.datacommons.org/v1/recon/resolve/id -_DC_API_PATH_RESOLVE_ID = '/v1/recon/resolve/id' # Resolve latlng coordinate # https://api.datacommons.org/v2/resolve _DC_API_PATH_RESOLVE_COORD = '/v2/resolve' # Default API key for limited tests _DEFAULT_DC_API_KEY = 'AIzaSyCTI4Xz-UW_G2Q2RfknhcfdAnTHq5X5XuI' -_API_ROOT_LOCK = threading.Lock() _DEFAULT_API_ROOT = 'https://api.datacommons.org' -def dc_api_wrapper( - function, - args: dict, - retries: int = 3, - retry_secs: int = 1, - use_cache: bool = False, - api_root: str = None, -): +def dc_api_wrapper(function, + args: dict, + retries: int = 3, + retry_secs: int = 1, + use_cache: bool = False): """Wrapper for a DC API call with retries and caching. Returns the result from the DC APi call function. In case of errors, retries @@ -78,9 +64,6 @@ def dc_api_wrapper( retries: Number of retries in case of HTTP errors. retry_sec: Interval in seconds between retries for which caller is blocked. use_cache: If True, uses request cache for faster response. - api_root: The API server to use. Default is 'http://api.datacommons.org'. To - use autopush with more recent data, set it to - 'http://autopush.api.datacommons.org' Returns: The response from the DataCommons API call. @@ -104,23 +87,7 @@ def dc_api_wrapper( f'Invoking DC API {function}, #{attempt} with {args},' f' retries={retries}') - response = None - if api_root: - # All calls serialize here to prevent races while updating the - # global Data Commons API root. - with _API_ROOT_LOCK: - original_api_root = dc.utils._API_ROOT - if api_root: - dc.utils._API_ROOT = api_root - logging.debug( - f'Setting DC API root to {api_root} for {function}' - ) - try: - response = function(**args) - finally: - dc.utils._API_ROOT = original_api_root - else: - response = function(**args) + response = function(**args) logging.debug( f'Got API response {response} for {function}, {args}') @@ -211,11 +178,7 @@ def dc_api_batched_wrapper( api_result = {} index = 0 num_dcids = len(dcids) - dc_api_root = config.get('dc_api_root', None) - if config.get('dc_api_version', 'V2') == 'V2': - # V2 API assumes api root is set in the function's client - dc_api_root = None - api_batch_size = config.get('dc_api_batch_size', dc.utils._MAX_LIMIT) + api_batch_size = config.get('dc_api_batch_size', 100) logging.debug( f'Calling DC API {function} on {len(dcids)} dcids in batches of' f' {api_batch_size} with args: {args}...') @@ -232,7 +195,6 @@ def dc_api_batched_wrapper( config.get('dc_api_retries', 3), config.get('dc_api_retry_secs', 5), config.get('dc_api_use_cache', False), - dc_api_root, ) if batch_result: dc_api_merge_results(api_result, batch_result) @@ -324,15 +286,6 @@ def dc_api_is_defined_dcid(dcids: list, config: dict = {}) -> dict: api_function = client.node.fetch_property_values args = {'properties': 'typeOf'} dcid_arg_kw = 'node_dcids' - if config.get('dc_api_version', 'V2') != 'V2': - # Set parameters for V1 API. - api_function = dc.get_property_values - args = { - 'prop': 'typeOf', - 'out': True, - } - dcid_arg_kw = 'dcids' - api_result = dc_api_batched_wrapper(function=api_function, dcids=dcids, args=args, @@ -348,33 +301,31 @@ def dc_api_is_defined_dcid(dcids: list, config: dict = {}) -> dict: return response -def dc_api_get_node_property(dcids: list, prop: str, config: dict = {}) -> dict: +def dc_api_get_node_property(dcids: list, + prop: str, + out: bool = True, + constraints: dict = {}, + config: dict = {}) -> dict: """Returns a dictionary keyed by dcid with { prop:value } for each dcid. Uses the get_property_values() DC API to lookup the property for each dcid. Args: dcids: List of dcids. The namespace is stripped from the dcid. + prop: proroty to be looked up. + out: If true, lookup values of the property for the given dcids. + If False, returns dcids for which the property has the value in dcids. config: dictionary of configurationparameters for the wrapper. See dc_api_batched_wrapper and dc_api_wrapper for details. Returns: dictionary with each input dcid mapped to a True/False value. """ - is_v2 = config.get('dc_api_version', 'V2') == 'V2' # Set parameters for V2 node API. client = get_datacommons_client(config) api_function = client.node.fetch_property_values - args = {'properties': prop} + args = {'properties': prop, 'out': out, 'constraints': constraints} dcid_arg_kw = 'node_dcids' - if not is_v2: - # Set parameters for V1 API. - api_function = dc.get_property_values - args = { - 'prop': prop, - 'out': True, - } - dcid_arg_kw = 'dcids' api_result = dc_api_batched_wrapper(function=api_function, dcids=dcids, args=args, @@ -387,23 +338,19 @@ def dc_api_get_node_property(dcids: list, prop: str, config: dict = {}) -> dict: if not node_data: continue - if is_v2: - values = [] - arcs = node_data.get('arcs', {}) - prop_nodes = arcs.get(prop, {}).get('nodes', []) - for node in prop_nodes: - val_dcid = node.get('dcid') - if val_dcid: - values.append(val_dcid) - value = node.get('value') - if value: - value = '"' + value + '"' - values.append(value) - if values: - response[dcid] = {prop: ','.join(values)} - else: # V1 - if node_data: - response[dcid] = {prop: node_data} + values = [] + arcs = node_data.get('arcs', {}) + prop_nodes = arcs.get(prop, {}).get('nodes', []) + for node in prop_nodes: + val_dcid = node.get('dcid') + if val_dcid: + values.append(val_dcid) + value = node.get('value') + if value: + value = '"' + value + '"' + values.append(value) + if values: + response[dcid] = {prop: ','.join(values)} return response @@ -419,8 +366,6 @@ def dc_api_get_node_property_values(dcids: list, config: dict = {}) -> dict: dictionary with each dcid with the namspace 'dcid:' as the key mapped to a dictionary of property:value. """ - if config.get('dc_api_version', 'V2') != 'V2': - return dc_api_v1_get_node_property_values(dcids, config) # Lookup node properties using V2 node API client = get_datacommons_client(config) api_function = client.node.fetch @@ -454,42 +399,6 @@ def dc_api_get_node_property_values(dcids: list, config: dict = {}) -> dict: return response -def dc_api_v1_get_node_property_values(dcids: list, config: dict = {}) -> dict: - """Returns all the property values for a set of dcids from the DC V1 API. - - Args: - dcids: list of dcids to lookup - config: configuration parameters for the wrapper. See - dc_api_batched_wrapper() and dc_api_wrapper() for details. - - Returns: - dictionary with each dcid with the namspace 'dcid:' as the key - mapped to a dictionary of property:value. - """ - predefined_nodes = OrderedDict() - api_function = dc.get_triples - api_triples = dc_api_batched_wrapper(api_function, dcids, {}, config=config) - if api_triples: - for dcid, triples in api_triples.items(): - if (_strip_namespace(dcid) not in dcids and - _add_namespace(dcid) not in dcids): - continue - pvs = {} - for d, prop, val in triples: - if d == dcid and val: - # quote string values with spaces if needed - if ' ' in val and val[0] != '"': - val = '"' + val + '"' - if prop in pvs: - val = pvs[prop] + ',' + val - pvs[prop] = val - if len(pvs) > 0: - if 'Node' not in pvs: - pvs['Node'] = _add_namespace(dcid) - predefined_nodes[_add_namespace(dcid)] = pvs - return predefined_nodes - - def dc_api_resolve_placeid(dcids: list, in_prop: str = 'placeId', *, @@ -507,48 +416,25 @@ def dc_api_resolve_placeid(dcids: list, """ if not config: config = {} - if config.get('dc_api_version', 'V2') == 'V2': - client = get_datacommons_client(config) - api_function = client.resolve.fetch - args = {'expression': f'<-{in_prop}->dcid'} - dcid_arg_kw = 'node_ids' - api_result = dc_api_batched_wrapper(function=api_function, - dcids=dcids, - args=args, - dcid_arg_kw=dcid_arg_kw, - config=config) - results = {} - if api_result: - for node in api_result.get('entities', []): - place_id = node.get('node') - if place_id: - candidates = node.get('candidates', []) - if candidates: - dcid = candidates[0].get('dcid') - if dcid: - results[place_id] = dcid - return results - - # V1 implementation - api_root = config.get('dc_api_root', _DEFAULT_API_ROOT) - data = {'in_prop': in_prop, 'out_prop': 'dcid'} - data['ids'] = dcids - num_ids = len(dcids) - api_url = api_root + _DC_API_PATH_RESOLVE_ID - logging.debug( - f'Looking up {api_url} dcids for {num_ids} placeids: {data["ids"]}') - recon_resp = request_url(url=api_url, - params=data, - method='POST', - output='json') - # Extract the dcid for each place from the response + client = get_datacommons_client(config) + api_function = client.resolve.fetch + args = {'expression': f'<-{in_prop}->dcid'} + dcid_arg_kw = 'node_ids' + api_result = dc_api_batched_wrapper(function=api_function, + dcids=dcids, + args=args, + dcid_arg_kw=dcid_arg_kw, + config=config) results = {} - if recon_resp: - for entity in recon_resp.get('entities', []): - place_id = entity.get('inId', '') - out_dcids = entity.get('outIds', None) - if place_id and out_dcids: - results[place_id] = out_dcids[0] + if api_result: + for node in api_result.get('entities', []): + place_id = node.get('node') + if place_id: + candidates = node.get('candidates', []) + if candidates: + dcid = candidates[0].get('dcid') + if dcid: + results[place_id] = dcid return results diff --git a/util/latlng_recon_geojson.py b/util/latlng_recon_geojson.py index 9a301e543e..b0a89e01fa 100644 --- a/util/latlng_recon_geojson.py +++ b/util/latlng_recon_geojson.py @@ -22,6 +22,7 @@ import logging import time import urllib +from dc_api_wrapper import dc_api_get_values _WORLD = 'Earth' _USA = 'country/USA' @@ -39,7 +40,7 @@ def _get_geojsons(place_type, parent_place, retry=0): try: places = dc.get_places_in([parent_place], place_type)[parent_place] - resp = dc.get_property_values(places, _GJ_PROP[place_type]) + resp = dc_api_get_values(places, _GJ_PROP[place_type]) geojsons = {} for p, gj in resp.items(): if not gj: diff --git a/util/string_utils.py b/util/string_utils.py new file mode 100644 index 0000000000..c4ef88b9d8 --- /dev/null +++ b/util/string_utils.py @@ -0,0 +1,76 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""String utilities.""" + +import csv + + +def str_to_list(value: str) -> list: + """Returns the value as a list + + Args: + value: string with a single value or comma seperated list of values + + Returns: + value as a list. + """ + if isinstance(value, list): + return value + value_list = [] + # Read the string as a comma separated line. + is_quoted = '"' in value + try: + if is_quoted and "," in value: + # Read the string as a quoted comma separated line. + row = list( + csv.reader([value], + delimiter=',', + quotechar='"', + skipinitialspace=True))[0] + else: + # Without " quotes, the line can be split on commas. + # Avoiding csv reader calls for performance. + row = value.split(',') + for v in row: + val_normalized = to_quoted_string(v, is_quoted=is_quoted) + value_list.append(val_normalized) + except csv.Error: + logging.error( + f'Too large value {len(value)}, failed to convert to list') + value_list = [value] + return value_list + +def to_quoted_string(value: str, is_quoted: bool = None) -> str: + """Returns a quoted string if there are spaces and special characters. + + Args: + value: string value to be quoted if necessary. + is_quoted: if True, returns values as quotes strings. + + Returns: + value with optional double quotes. + """ + if not value or not isinstance(value, str): + return value + + value = value.strip('"') + value = value.strip() + if value.startswith('[') and value.endswith(']'): + return normalize_range(value) + if value and (' ' in value or ',' in value or is_quoted): + if value and value[0] != '"': + return '"' + value + '"' + return value + +