Source code for mdf_forge.forge

import os
import re
from urllib.parse import urlparse

import globus_sdk
import mdf_toolbox
import requests


from tqdm import tqdm

# Maximum recommended number of HTTP file transfers
#  Large transfers are much better suited for Globus Transfer
HTTP_NUM_LIMIT = 50


[docs]class Forge(mdf_toolbox.AggregateHelper, mdf_toolbox.SearchHelper): """Forge fetches metadata and files from the Materials Data Facility. Forge is intended to be the best way to access MDF data for all users. An internal Query object is used to make queries. From the user's perspective, an instantiation of Forge will black-box searching. """ # "Private" variables __default_index = "mdf" __scroll_field = "mdf.scroll_id" __auth_services = ["data_mdf", "transfer", "search", "petrel"] __anon_services = ["search"] __app_name = "MDF_Forge" __client_id = "b2b437c4-17c1-4e4b-8f15-e9783e1312d7" __transfer_interval = 60 # 1 minute, in seconds __inactivity_time = 1 * 60 * 60 # 1 hour, in seconds # "Protected" variables (for dev/debugging) _schemas_url = "https://api.materialsdatafacility.org/schemas/" _organizations_url = "https://api.materialsdatafacility.org/organizations/"
[docs] def __init__(self, index=__default_index, local_ep=None, anonymous=False, clear_old_tokens=False, **kwargs): """Create an MDF Forge Client. Arguments: index (str): The Search index to search on. **Default:** ``"mdf"``. local_ep (str): The endpoint ID of the local Globus Connect Personal endpoint. If needed but not provided, the local endpoint will be autodetected if possible. anonymous (bool): If ``True``, will not authenticate with Globus Auth. If ``False``, will require authentication. **Default:** ``False``. Caution: Authentication is required for some Forge functionality, including viewing private datasets and using Globus Transfer. clear_old_tokens (bool): If ``True``, will force reauthentication. If ``False``, will use existing tokens if possible. Has no effect if ``anonymous`` is ``True``. **Default:** ``False``. Keyword Arguments: services (list of str): *Advanced users only.* The services to authenticate with, using Toolbox. An empty list will disable authenticating with Toolbox. Note that even overwriting clients (with other keyword arguments) does not stop Toolbox authentication. Only a blank ``services`` argument will disable Toolbox authentication. search_client (globus_sdk.SearchClient): An authenticated SearchClient to overwrite the default. transfer_client (globus_sdk.TransferClient): An authenticated TransferClient to override the default. data_mdf_authorizer (globus_sdk.GlobusAuthorizer): An authenticated GlobusAuthorizer to overwrite the default for accessing the MDF NCSA endpoint. petrel_authorizer (globus_sdk.GlobusAuthorizer): An authenticated GlobusAuthorizer to override the default. """ self.__anonymous = anonymous self.local_ep = local_ep if self.__anonymous: services = kwargs.get('services', self.__anon_services) clients = (mdf_toolbox.anonymous_login(services) if services else {}) else: services = kwargs.get('services', self.__auth_services) if services: clients = mdf_toolbox.login(services=services, app_name=self.__app_name, client_id=self.__client_id, clear_old_tokens=clear_old_tokens) else: clients = {} search_client = kwargs.pop("search_client", clients.get("search", None)) self.__transfer_client = kwargs.get("transfer_client", clients.get("transfer", None)) self.__data_mdf_authorizer = kwargs.get("data_mdf_authorizer", clients.get("data_mdf", globus_sdk.NullAuthorizer())) self.__petrel_authorizer = kwargs.get("petrel_authorizer", clients.get("petrel", globus_sdk.NullAuthorizer())) super().__init__(index=index, search_client=search_client, scroll_field=self.__scroll_field, **kwargs)
# *********************************************** # * Field-specific helpers # ***********************************************
[docs] def match_source_names(self, source_names): """Add sources to match to the query. Arguments: source_names (str or list of str): The ``source_name`` values to match. ``source_id`` values are also accepted, but are matched without the additional version information they have. Returns: Forge: Self """ # If no source_names are supplied, nothing to match if not source_names: return self if isinstance(source_names, str): source_names = [source_names] # If passed source_ids, strip version info sanitized_names = [] for src in source_names: match = re.search("_v[0-9]+\\.[0-9]+$", src) if match: sanitized_names.append(src[:match.start()]) else: sanitized_names.append(src) source_names = sanitized_names # First source should be in new group and required self.match_field(field="mdf.source_name", value=source_names[0], required=True, new_group=True) # Other sources should stay in that group, and not be required for src in source_names[1:]: self.match_field(field="mdf.source_name", value=src, required=False, new_group=False) return self
[docs] def match_records(self, source_name, scroll_ids): """Match specific records from a given dataset. Multiple records may be matched, but only one dataset per call. Arguments: source_name (str): The ``source_name`` of the records' dataset. The ``source_id`` is also accepted for convenience. scroll_ids (int or list of int): The ``scroll_id`` values of the records to match. Returns: Forge: self """ if not source_name or not scroll_ids: return self if isinstance(scroll_ids, int): scroll_ids = [scroll_ids] # If passed source_id, strip version info match = re.search("_v[0-9]+\\.[0-9]+$", source_name) if not match: match = (re.search("_v[0-9]+-[0-9]+$", source_name) or re.search("_v[0-9]+$", source_name)) if match: source_name = source_name[:match.start()] # source_name is required, starts new group # First scroll is (nested) new required group # (source:source AND (scroll:scroll0 OR scroll:scroll1 ... )) self.match_field(field="mdf.source_name", value=source_name, required=True, new_group=True) self.match_field(field="mdf.scroll_id", value=scroll_ids[0], required=True, new_group=True) for scroll in scroll_ids[1:]: self.match_field(field="mdf.scroll_id", value=scroll, required=False, new_group=False) return self
[docs] def match_elements(self, elements, match_all=True): """Add elemental abbreviations to the query. Arguments: elements (str or list of str): The elements to match. For example, `"Fe"` for iron. match_all (bool): If ``True``, will add with ``AND``. If ``False``, will use ``OR``. Default ``True``. Returns: Forge: Self """ # If no elements are supplied, nothing to match if not elements: return self if isinstance(elements, str): elements = [elements] # First element should be in new group and required self.match_field(field="material.elements", value=elements[0], required=True, new_group=True) # Other elements should stay in that group for element in elements[1:]: self.match_field(field="material.elements", value=element, required=match_all, new_group=False) return self
[docs] def match_titles(self, titles): """Add titles to the query. Arguments: titles (str or list of str): The titles to match. Returns: Forge: Self """ if not titles: return self if not isinstance(titles, list): titles = [titles] self.match_field(field="dc.titles.title", value=titles[0], required=True, new_group=True) for title in titles[1:]: self.match_field(field="dc.titles.title", value=title, required=False, new_group=False) return self
[docs] def match_years(self, years=None, start=None, stop=None, inclusive=True): """Add years and limits to the query. Arguments: years (int or string, or list of int or strings): The years to match. Note that this argument overrides the start, stop, and inclusive arguments. start (int or string): The lower range of years to match. stop (int or string): The upper range of years to match. inclusive (bool): If ``True``, the start and stop values will be included in the search. If ``False``, they will be excluded. **Default:** ``True``. Returns: Forge: Self """ # If nothing supplied, nothing to match if years is None and start is None and stop is None: return self # No filtering if no filters provided if years is not None and years != []: if not isinstance(years, list): years = [years] years_int = [] for year in years: try: y_int = int(year) years_int.append(y_int) except ValueError: raise AttributeError("Invalid year: '{}'".format(year)) # Only match years if valid years were supplied if len(years_int) > 0: self.match_field(field="dc.publicationYear", value=years_int[0], required=True, new_group=True) for year in years_int[1:]: self.match_field(field="dc.publicationYear", value=year, required=False, new_group=False) else: if start is not None: try: start = int(start) except ValueError: raise AttributeError("Invalid start year: '{}'".format(start)) if stop is not None: try: stop = int(stop) except ValueError: raise AttributeError("Invalid stop year: '{}'".format(stop)) self.match_range(field="dc.publicationYear", start=start, stop=stop, inclusive=inclusive, required=True, new_group=True) return self
[docs] def match_resource_types(self, types): """Match the given resource types. Arguments: types (str or list of str): The ``resource_type`` values to match. Returns: Forge: Self """ # If no types, nothing to match if not types: return self if isinstance(types, str): types = [types] # First type should be in new group and required self.match_field(field="mdf.resource_type", value=types[0], required=True, new_group=True) # Other IDs should stay in that group, and not be required for rt in types[1:]: self.match_field(field="mdf.resource_type", value=rt, required=False, new_group=False) return self
[docs] def match_organizations(self, organizations, match_all=True): """Match the given Organizations. Organizations are MDF-registered groups that can apply rules to datasets. Arguments: organizations (str or list of str): The organizations to match. match_all (bool): If ``True``, will add with ``AND``. If ``False``, will use ``OR``. **Default:** ``True``. Returns: Forge: Self """ # If no orgs, nothing to match if not organizations: return self if isinstance(organizations, str): organizations = [organizations] # First org should be in new group and required self.match_field(field="mdf.organizations", value=organizations[0], required=True, new_group=True) # Other elements should stay in that group for org in organizations[1:]: self.match_field(field="mdf.organizations", value=org, required=match_all, new_group=False) return self
[docs] def match_dois(self, dois): """Match the given Digital Object Identifiers. Arguments: dois (str or list of str): DOIs to match and return. Returns: Forge: self """ if not dois: return self if isinstance(dois, str): dois = [dois] # Sanitize DOIs - usually contain problem characters # First doi should be in new group and required self.match_field(field="dc.identifier.identifier", value=dois[0], required=True, new_group=True) # Other sources should stay in that group, and not be required for doi in dois[1:]: self.match_field(field="dc.identifier.identifier", value=doi, required=False, new_group=False) return self
# *********************************************** # * Premade searches # ***********************************************
[docs] def search_by_elements(self, elements, source_names=[], index=None, limit=None, match_all=True, info=False): """Execute a search for the given elements in the given sources. ``search_by_elements([x], [y])`` is equivalent to ``match_elements([x]).match_source_names([y]).search()``. Note: This method will use terms from the current query, and resets the current query. Arguments: elements (list of str): The elements to match. For example, `"Fe"` for iron. source_names (list of str): The ``source_name``s to match. **Default:** ``[]``. index (str): The Search index to search on. **Default:** The current index. limit (int): The maximum number of results to return. The max for this argument is the ``SEARCH_LIMIT`` imposed by Globus Search. **Default:** ``SEARCH_LIMIT``. match_all (bool): If ``True``, will add elements with ``AND``. If ``False``, will use ``OR``. **Default:** ``True``. info (bool): If ``False``, search will return a list of the results. If ``True``, search will return a tuple containing the results list and other information about the query. **Default:** ``False``. Returns: If ``info`` is ``False``, *list*: The search results. If ``info`` is ``True``, *tuple*: The search results, and a dictionary of query information. """ return (self.match_elements(elements, match_all=match_all) .match_source_names(source_names) .search(limit=limit, info=info))
[docs] def search_by_titles(self, titles, index=None, limit=None, info=False): """Execute a search for the given titles. ``search_by_titles([x])`` is equivalent to ``match_titles([x]).search()`` Note: This method will use terms from the current query, and resets the current query. Arguments: titles (list of str): The titles to match. index (str): The Search index to search on. **Default:** The current index. limit (int): The maximum number of results to return. The max for this argument is the ``SEARCH_LIMIT`` imposed by Globus Search. **Default:** ``SEARCH_LIMIT``. info (bool): If ``False``, search will return a list of the results. If ``True``, search will return a tuple containing the results list and other information about the query. **Default:** ``False``. Returns: If ``info`` is ``False``, *list*: The search results. If ``info`` is ``True``, *tuple*: The search results, and a dictionary of query information. """ return self.match_titles(titles).search(limit=limit, info=info)
[docs] def search_by_dois(self, dois, index=None, limit=None, info=False): """Execute a search for the given Digital Object Identifiers. ``search_by_dois([x])`` is equivalent to ``match_dois([x]).search()`` Note: This method will use terms from the current query, and resets the current query. Arguments: dois (list of str): The DOIs to find. index (str): The Search index to search on. **Default:** The current index. limit (int): The maximum number of results to return. The max for this argument is the ``SEARCH_LIMIT`` imposed by Globus Search. **Default:** ``SEARCH_LIMIT``. info (bool): If ``False``, search will return a list of the results. If ``True``, search will return a tuple containing the results list and other information about the query. **Default:** ``False``. Returns: If ``info`` is ``False``, *list*: The search results. If ``info`` is ``True``, *tuple*: The search results, and a dictionary of query information. """ return self.match_dois(dois).search(limit=limit, info=info)
[docs] def aggregate_sources(self, source_names, index=None): """Aggregate all records with the given ``source_name`` values. There is no limit to the number of results returned. Please beware of aggregating very large datasets. Caution: It is recommended that you check how many entries will be returned from your chosen datasets by running ``match_source_names(source_names).search(limit=0, info=True)`` before using ``aggregate_sources()``. Note: This method will use terms from the current query, and resets the current query. Arguments: source_names (str or list of str): The ``source_name`` values to aggregate. index (str): The Search index to search on. **Default:** The current index. Returns: list of dict: All of the entries from the ``source_name`` matches. """ return self.match_source_names(source_names).aggregate(index=index)
[docs] def fetch_datasets_from_results(self, entries=None, query=None, reset_query=True): """Retrieve the dataset entries for given records. Note that this method may use the current query. Note: This method will use terms from the current query, and resets the current query. Arguments: entries (dict, list of dict, or tuple of dict): The records to parse to find the datasets. This argument can be a single entry, a list of entries, or a tuple with a list of entries. The latter two options support both return values of the ``search()`` method. If entries is ``None``, the current query is executed and those results are used instead. **Default:** ``None``. query (str): If not ``None``, search for entries using this query instead of the current query. Has no effect if ``entries`` is not ``None``. **Default:** ``None``. reset_query (bool): Has no effect unless ``entries`` and ``query`` are both ``None``. If ``True``, will reset the current query after searching for entries. If ``False``, will not reset the current query. **Default:** ``True``. Returns: list: The dataset entries. """ if entries is None: entries = self.search(q=query, reset_query=(query is None or reset_query)) # If entries is not a list of dict, make it one if isinstance(entries, dict): entries = [entries] elif isinstance(entries, tuple): entries = entries[0] # If no entries, error if len(entries) == 0: raise ValueError("No entries provided or found") # Extract source_name from every entry, make unique, skip invalid entries ds_ids = set([entry["mdf"]["source_name"] for entry in entries if entry.get("mdf", {}).get("source_name")]) if not ds_ids: return [] return self.match_source_names(ds_ids).match_resource_types("dataset").search()
[docs] def get_dataset_version(self, source_name): """Get the version of a certain dataset. Arguments: source_name (string): The ``source_name`` of the dataset. Returns: int: Version of the dataset in question. """ hits = self.search("mdf.source_name:{} AND" " mdf.resource_type:dataset".format(source_name), advanced=True, limit=2) # Some error checking if len(hits) == 0: raise ValueError("No such dataset found: " + source_name) elif len(hits) > 1: raise ValueError("Unexpectedly matched multiple datasets with source_name '{}'. " "Please contact MDF support.".format(source_name)) else: return hits[0]['mdf']['version']
# *********************************************** # * Data retrieval functions # ***********************************************
[docs] def http_download(self, results, dest=".", preserve_dir=False, verbose=True): """Download data files from the provided results using HTTPS. For a large number of files, you should use ``globus_download()`` instead, which uses Globus Transfer. Arguments: results (dict): The records from which files should be fetched. This should be the return value of a search method. dest (str): The destination path for the data files on the local machine. **Default:** The current directory. preserve_dir (bool): If ``True``, the directory structure for the data files will be recreated at the destination. If ``False``, only the data files themselves will be saved. **Default:** ``False``. verbose (bool): If ``True``, status and progress messages will be printed. If ``False``, only error messages will be printed. **Default:** ``True``. Returns: *dict*: The status information for the download: * **success** (*bool*): ``True`` if the download succeeded. ``False`` if it failed. * **message** (*str*): The error message, if the download failed. """ if self.__anonymous: print("Error: Anonymous HTTP download not yet supported.") return { "success": False, "message": "Anonymous HTTP download not yet supported." } # If user submitted single result, make into list if isinstance(results, dict): results = [results] # If results have info attached, remove it elif isinstance(results, tuple): results = results[0] if len(results) > HTTP_NUM_LIMIT: print("Error: Too many results supplied. Use globus_download()" + " for fetching more than " + str(HTTP_NUM_LIMIT) + " entries.") return { "success": False, "message": ("Too many results supplied. Use globus_download()" + " for fetching more than " + str(HTTP_NUM_LIMIT) + " entries.") } for res in tqdm(results, desc="Fetching files", disable=(not verbose)): if res["mdf"]["resource_type"] == "dataset": print("Skipping datset entry for '{}': Cannot download dataset over HTTPS. " "Use globus_download() for datasets.".format(res["mdf"]["source_id"])) elif res["mdf"]["resource_type"] == "record": for dl in res.get("files", []): url = dl.get("url", None) if url: parsed_url = urlparse(url) remote_path = parsed_url.path # local_path should be either dest + whole path or dest + filename if preserve_dir: local_path = os.path.normpath(dest + "/" + remote_path) else: local_path = os.path.normpath(dest + "/" + os.path.basename(remote_path)) # Make dirs for storing the file if they don't exist # preserve_dir doesn't matter; local_path has accounted for it already try: os.makedirs(os.path.dirname(local_path)) # If dest is current dir and preserve_dir=False, there are no dirs to make. # os.makedirs() will raise FileNotFoundError (Python3 subclass of IOError). # Since it means all dirs required exist, it can be swallowed. except (IOError, OSError): pass # Check if file already exists, change filename if necessary collisions = 0 while os.path.exists(local_path): # Save and remove extension local_path, ext = os.path.splitext(local_path) # Check if already added number to end old_add = "("+str(collisions)+")" collisions += 1 new_add = "("+str(collisions)+")" # Remove old number if exists if local_path.endswith(old_add): local_path = local_path[:-len(old_add)] # Add new number local_path = local_path + new_add + ext headers = {} # Check for Petrel vs. NCSA url for authorizer # Petrel if parsed_url.netloc == "e38ee745-6d04-11e5-ba46-22000b92c6ec.e.globus.org": authorizer = self.__petrel_authorizer elif parsed_url.netloc == "data.materialsdatafacility.org": authorizer = self.__data_mdf_authorizer else: authorizer = globus_sdk.NullAuthorizer() authorizer.set_authorization_header(headers) response = requests.get(url, headers=headers) # Handle first 401 by regenerating auth headers if response.status_code == 401: authorizer.handle_missing_authorization() authorizer.set_authorization_header(headers) response = requests.get(url, headers=headers) # Handle other errors by passing the buck to the user if response.status_code != 200: print("Error {} when attempting to access " "'{}'".format(response.status_code, url)) else: # Write out the binary response content with open(local_path, 'wb') as output: output.write(response.content) else: print("Error: Found unknown resource_type '{}'. " "Skipping entry.".format(res["mdf"]["resource_type"])) return { "success": True }
[docs] def globus_download(self, results, dest=".", dest_ep=None, preserve_dir=False, inactivity_time=None, download_datasets=False, verbose=True): """Download data files from the provided results using Globus Transfer. This method requires Globus Connect to be installed on the destination endpoint. Arguments: results (dict): The records from which files should be fetched. This should be the return value of a search method. dest (str): The destination path for the data files on the local machine. **Default:** The current directory. dest_ep (str): The destination endpoint ID. **Default:** The autodetected local GCP. preserve_dir (bool): If ``True``, the directory structure for the data files will be recreated at the destination. The path to the new files will be relative to the ``dest`` path If ``False``, only the data files themselves will be saved. **Default:** ``False``. inactivity_time (int): Number of seconds the Transfer is allowed to go without progress before being cancelled. **Default:** ``self.__inactivity_time``. download_datasets (bool): If ``True``, will download the full dataset for any dataset entries given. If ``False``, will skip dataset entries with a notification. **Default:** ``False``. Caution: Datasets can be large. Additionally, if you do not filter out records from a dataset you provide, you may end up with duplicate files. Use with care. verbose (bool): If ``True``, status and progress messages will be printed, and errors will prompt for continuation confirmation. If ``False``, only error messages will be printed, and the Transfer will always continue. **Default:** ``True``. Returns: list of str: The task IDs of the Globus transfers. """ if self.__anonymous: print("Error: Anonymous Globus Transfer not supported.") return { "success": False, "message": "Anonymous Globus Transfer not supported." } dest = os.path.abspath(dest) # If results have info attached, remove it if type(results) is tuple: results = results[0] if not dest_ep: if not self.local_ep: self.local_ep = globus_sdk.LocalGlobusConnectPersonal().endpoint_id dest_ep = self.local_ep if not inactivity_time: inactivity_time = self.__inactivity_time # Assemble the transfer data tasks = {} filenames = set() links_processed = set() for res in tqdm(results, desc="Processing records", disable=(not verbose)): file_list = [] if res["mdf"]["resource_type"] == "dataset": if download_datasets: g_link = res.get("data", {}).get("endpoint_path", None) if g_link: file_list.append(g_link) elif verbose: print("Skipping dataset '{}' because argument 'download_datasets' is " "False. Use caution if enabling.".format(res["mdf"]["source_id"])) elif res["mdf"]["resource_type"] == "record": for dl in res.get("files", []): g_link = dl.get("globus", None) if g_link: file_list.append(g_link) else: print("Error: Found unknown resource_type '{}'. " "Skipping entry.".format(res["mdf"]["resource_type"])) for globus_link in file_list: # If the data is on a Globus Endpoint if globus_link not in links_processed: links_processed.add(globus_link) ep_id = urlparse(globus_link).netloc ep_path = urlparse(globus_link).path # local_path should be either dest + whole path or dest + filename if preserve_dir: # ep_path is absolute, so os.path.join does not work local_path = os.path.abspath(dest + ep_path) else: # If ep_path is to dir, basename is '' # basename(dirname(ep_path)) gives just first dir's name base_name = os.path.basename(ep_path) if not base_name: base_name = os.path.basename(os.path.dirname(ep_path)) local_path = os.path.abspath(os.path.join(dest, base_name)) # Make dirs for storing the file if they don't exist # preserve_dir doesn't matter; local_path has accounted for it already try: os.makedirs(os.path.dirname(local_path)) # If dest is current dir and preserve_dir=False, there are no dirs to make. # os.makedirs() will raise FileNotFoundError (Python3 subclass of IOError). # Since it means all dirs required exist, it can be swallowed. except (IOError, OSError): pass if not preserve_dir: # Check if file already exists, change filename if necessary # The pattern is to add a number just before the extension # (e.g., myfile(1).ext) collisions = 0 while os.path.exists(local_path) or local_path in filenames: # Get extension, if exists base_file, ext = os.path.splitext(local_path) # Check if already added number to end old_add = "("+str(collisions)+")" collisions += 1 new_add = "("+str(collisions)+")" if base_file.endswith(old_add): local_path = base_file[:-len(old_add)] + new_add + ext else: local_path = base_file + new_add + ext # If ep_path points to a dir, the trailing slash has been removed if ep_path.endswith("/"): local_path += "/" # Add data to list of transfer files if ep_id not in tasks.keys(): tasks[ep_id] = [] tasks[ep_id].append((ep_path, local_path)) filenames.add(local_path) # Submit the jobs success = 0 failed = 0 for task_ep, task_paths in tqdm(tasks.items(), desc="Transferring data", disable=(not verbose)): transfer = mdf_toolbox.custom_transfer(self.__transfer_client, task_ep, dest_ep, task_paths, interval=self.__transfer_interval, inactivity_time=inactivity_time) cont = True # Prime loop event = next(transfer) try: # Loop ends on StopIteration from generator exhaustion while True: if not event["success"] and cont: print("Error: {} - {}".format(event["code"], event["description"])) if verbose: # Allow user to abort transfer if verbose, else cont is always True user_cont = input("Continue Transfer (y/n)?\n") cont = (user_cont.strip().lower() == "y" or user_cont.strip().lower() == "yes") event = transfer.send(cont) except StopIteration: pass if not event["success"]: print("Error transferring with endpoint '{}': {} - " "{}".format(task_ep, event["status"], event["nice_status_short_description"])) failed += 1 # Allow cancellation of remaining Transfers if Transfer are remaining if verbose and list(tasks.keys())[-1] != task_ep: user_cont = input("Continue Transfer (y/n)?\n") if not (user_cont.strip().lower() == "y" or user_cont.strip().lower() == "yes"): break else: success += 1 if verbose: print("All transfers processed\n{} transfers succeeded\n" "{} transfers failed".format(success, failed)) return
[docs] def http_stream(self, results, verbose=True): """Yield data files from the provided results using HTTPS, through a generator. For a large number of files, you should use ``globus_download()`` instead, which uses Globus Transfer. Arguments: results (dict): The records from which files should be fetched. This should be the return value of a search method. verbose (bool): If ``True``, status and progress messages will be printed. If ``False``, only error messages will be printed. **Default:** ``True``. Yields: str: Text of each data file. """ if self.__anonymous: print("Error: Anonymous HTTP download not yet supported.") yield { "success": False, "message": "Anonymous HTTP download not yet supported." } return # If results have info attached, remove it if type(results) is tuple: results = results[0] if type(results) is not list: results = [results] if len(results) > HTTP_NUM_LIMIT: print("Too many results supplied. Use globus_download()" + " for fetching more than " + str(HTTP_NUM_LIMIT) + " entries.") yield { "success": False, "message": ("Too many results supplied. Use globus_download()" + " for fetching more than " + str(HTTP_NUM_LIMIT) + " entries.") } return for res in results: for dl in res.get("files", []): url = dl.get("url", None) if url: parsed_url = urlparse(url) headers = {} # Check for Petrel vs. NCSA url for authorizer # Petrel if parsed_url.netloc == "e38ee745-6d04-11e5-ba46-22000b92c6ec.e.globus.org": authorizer = self.__petrel_authorizer elif parsed_url.netloc == "data.materialsdatafacility.org": authorizer = self.__data_mdf_authorizer else: authorizer = globus_sdk.NullAuthorizer() authorizer.set_authorization_header(headers) response = requests.get(url, headers=headers) # Handle first 401 by regenerating auth headers if response.status_code == 401: authorizer.handle_missing_authorization() authorizer.set_authorization_header(headers) response = requests.get(url, headers=headers) # Handle other errors by passing the buck to the user if response.status_code != 200: print("Error ", response.status_code, " when attempting to access '", url, "'", sep="") yield None else: yield response.text
# *********************************************** # * Misc Forge-specific utility functions # ***********************************************
[docs] def describe_field(self, resource_type, field=None, raw=False): """Fetch and display the description of a field in MDF, along with any subfields. Arguments: resource_type (str): The type of MDF entry to describe a field from. This value can be ``"dataset"`` or ``"record``. field (str): The field to describe, in dot notation. The field must be a part of the provided ``resource_type``. To see all fields in the given ``resource_type``, use the value ``None``. **Default:** ``None`` raw (bool): When ``False``, will format and print the schema. When ``True``, will return the raw JSON dictionary instead. For human consumption, ``False`` is recommended. **Default:** ``False`` """ if field == "None" or field == "all": field = None res = requests.get(self._schemas_url+resource_type) # Check for success error = None schema = None try: json_res = res.json() except Exception: if res.status_code < 300: error = "Error decoding {} response: {}".format(res.status_code, res.content) else: error = ("Error {}. MDF may be experiencing technical difficulties." .format(res.status_code)) else: if res.status_code >= 300: error = "Error {}: {}".format(res.status_code, json_res["error"]) else: # Support (Forge-undocumented) "all" and "list" keywords schema = json_res.get("schema", json_res.get("all_schemas", json_res.get("schema_list", {}))) # If user requested a specific field, extract only that if field and error is None: try: subfields = field.split(".") while len(subfields) > 0: subfield = subfields.pop(0) # If subfield is not in schema, try pulling out "properties" or "items" first # "items" must be first to pull "properties" from "items" if subfield not in schema.keys(): schema = schema.get("items", schema) schema = schema.get("properties", schema) # Pull out subfield, triggering KeyError if not present schema = schema[subfield] # KeyError here means field not in schema except KeyError as e: error = ("Error: Field {} (from '{}') not found in schema for " "resource_type '{}'".format(str(e), field, resource_type)) schema = None # Return if raw=True if raw: return { "success": error is None, "error": error, "schema": schema, "status_code": res.status_code } # Otherwise, print the result else: if error is not None: print(error) else: [print(line) for line in mdf_toolbox.prettify_jsonschema(schema)] return
[docs] def describe_organization(self, organization, summary=False, raw=False): """Fetch and display the description of an organization registered with MDF. Arguments: organization (str): The organization to describe. This value can also be ``"list"`` to list all organizations' names, or ``"all"`` to fetch the metadata for every organization (not recommended). summary (bool): When ``True``, will summarize the organization metadata. The summary just contains the non-technical information about the organization itself. When ``False``, will print all of the metadata. This parameter has no effect if ``raw=True``. **Default:** ``False`` raw (bool): When ``False``, will format and print the organization metadata. When ``True``, will return the raw JSON dictionary instead. For human consumption, ``False`` is recommended. **Default:** ``False`` """ res = requests.get(self._organizations_url+organization) # Check for success error = None org_res = None try: json_res = res.json() except Exception: if res.status_code < 300: error = "Error decoding {} response: {}".format(res.status_code, res.content) else: error = ("Error {}. MDF may be experiencing technical difficulties." .format(res.status_code)) else: if res.status_code >= 300: error = "Error {}: {}".format(res.status_code, json_res["error"]) else: # Support "all" and "list" keywords org_res = json_res.get("organization", json_res.get("all_organizations", json_res.get("organization_list", {}))) # Return if raw=True if raw: return { "success": error is None, "error": error, "organization": org_res, "status_code": res.status_code } # Otherwise, print the result else: if error is not None: print(error) else: # Support "all" and "list" if not isinstance(org_res, list): org_res = [org_res] for org in org_res: # Only "list" is non-dict, just print org name and continue if not isinstance(org, dict): print(org) continue print("\n", org["canonical_name"]) # If user just wants a summary, pop the non-summary keys # Essentially, the summary is non-technical info, # just describing the org itself - not in MDF context if summary: org.pop("canonical_name", None) # Already printed org.pop("permission_groups", None) org.pop("acl", None) org.pop("data_destinations", None) org.pop("curation", None) org.pop("project_blocks", None) org.pop("required_fields", None) org.pop("services", None) # Don't display "None" parents if not org.get("parent_organizations"): org.pop("parent_organizations", None) [print(line) for line in mdf_toolbox.prettify_json(org)] return