Source code for intake_stac.catalog

import os.path
import warnings

import pystac
from intake.catalog import Catalog
from intake.catalog.local import LocalCatalogEntry
from intake.source import DataSource
from pkg_resources import get_distribution
from pystac.extensions.eo import EOExtension

__version__ = get_distribution('intake_stac').version

# STAC catalog asset 'type' determines intake driver:
# https://github.com/radiantearth/stac-spec/blob/master/item-spec/item-spec.md#media-types
default_type = 'application/rasterio'
default_driver = 'rasterio'

drivers = {
    'application/netcdf': 'netcdf',
    'application/x-netcdf': 'netcdf',
    'application/parquet': 'parquet',
    'application/x-parquet': 'parquet',
    'application/x-hdf': 'netcdf',
    'application/x-hdf5': 'netcdf',
    'application/rasterio': 'rasterio',
    'image/vnd.stac.geotiff': 'rasterio',
    'image/vnd.stac.geotiff; cloud-optimized=true': 'rasterio',
    'image/x.geotiff': 'rasterio',
    'image/tiff; application=geotiff': 'rasterio',
    'image/tiff; application=geotiff; profile=cloud-optimized': 'rasterio',  # noqa: E501
    'image/tiff': 'rasterio',
    'image/jp2': 'rasterio',
    'image/png': 'xarray_image',
    'image/jpg': 'xarray_image',
    'image/jpeg': 'xarray_image',
    'text/xml': 'textfiles',
    'text/plain': 'textfiles',
    'text/html': 'textfiles',
    'application/json': 'textfiles',
    'application/geo+json': 'geopandas',
    'application/geopackage+sqlite3': 'geopandas',
    'application/vnd+zarr': 'zarr',
    'application/xml': 'textfiles',
}


class AbstractStacCatalog(Catalog):

    version = __version__
    partition_access = False

    def __init__(self, stac_obj, **kwargs):
        """
        Initialize the catalog.

        Parameters
        ----------
        stac_obj: stastac.STACObject
            A pystac.STACObject pointing to a STAC object
        kwargs : dict, optional
            Passed to intake.Catalog.__init__
        """
        if isinstance(stac_obj, self._stac_cls):
            self._stac_obj = stac_obj
        elif isinstance(stac_obj, str):
            self._stac_obj = self._stac_cls.from_file(stac_obj)
        else:
            raise ValueError('Expected %s instance, got: %s' % (self._stac_cls, type(stac_obj)))

        metadata = self._get_metadata(**kwargs.pop('metadata', {}))
        try:
            name = kwargs.pop('name', self._stac_obj.id)
        except AttributeError:
            # Not currently tested.
            # ItemCollection does not require an id
            # Unclear what the state of ItemCollection is.
            name = str(type(self._stac_obj))

        super().__init__(name=name, metadata=metadata, **kwargs)

    @classmethod
    def from_url(cls, url, **kwargs):
        """
        Initialize the catalog from a STAC url.

        Parameters
        ----------
        url: str
            A URL pointing to a STAC json object
        kwargs : dict, optional
            Passed to intake.Catolog.__init__
        """
        stac_obj = cls._stac_cls.from_file(url)
        return cls(stac_obj, **kwargs)

    def _get_metadata(self, **kwargs):
        return kwargs  # pragma: no cover

    def serialize(self):
        """
        Serialize the catalog to yaml.

        Returns
        -------
        A string with the yaml-formatted catalog (just top-level).
        """
        return self.yaml()


[docs]class StacCatalog(AbstractStacCatalog): """ Maps Intake Catalog to a STAC Catalog https://pystac.readthedocs.io/en/latest/api.html?#catalog-spec """ name = 'stac_catalog' _stac_cls = pystac.Catalog def _load(self): """ Load the STAC Catalog. """ for subcatalog in self._stac_obj.get_children(): if isinstance(subcatalog, pystac.Collection): # Collection subclasses Catalog, so check it first driver = StacCollection else: driver = StacCatalog self._entries[subcatalog.id] = LocalCatalogEntry( name=subcatalog.id, description=subcatalog.description, driver=driver, # recursive catalog=self, args={'stac_obj': subcatalog.get_self_href()}, ) for item in self._stac_obj.get_items(): self._entries[item.id] = LocalCatalogEntry( name=item.id, description='', driver=StacItem, catalog=self, args={'stac_obj': item}, ) def _get_metadata(self, **kwargs): """ Keep copy of all STAC JSON except for links """ # NOTE: why not links? metadata = self._stac_obj.to_dict() del metadata['links'] return metadata
[docs]class StacCollection(StacCatalog): """ Maps Intake Catalog to a STAC Collection https://pystac.readthedocs.io/en/latest/api.html#collection-spec Collections have a number of properties that Catalogs do not, most notably the spatial and temporal extents. This is currently a placeholder for future Collection-specific attributes and methods. """ name = 'stac_catalog' _stac_cls = pystac.Collection def get_asset( self, key, storage_options=None, merge_asset_storage_options=True, merge_asset_open_kwargs=True, **kwargs, ): r""" Get a datasource for a collection-level asset. Parameters ---------- key : str, optional The asset key to use if multiple Zarr assets are provided. storage_options : dict, optional Additional arguments for the backend fsspec filesystem. merge_asset_storage_option : bool, default True Whether to merge the storage options provided by the asset under the ``xarray:storage_options`` key with `storage_options`. merge_asset_open_kwargs : bool, default True Whether to merge the keywords provided by the asset under the ``xarray:open_kwargs`` key with ``**kwargs``. **kwargs Additional keyword options are provided to the loader, for example ``consolidated=True`` to pass to :meth:`xarray.open_zarr`. Notes ----- The Media Type of the asset will be used to determine how to load the data. Returns ------- DataSource The dataset described by the asset loaded into a dask-backed object. """ try: asset = self._stac_obj.assets[key] except KeyError: raise KeyError( f'No asset named {key}. Should be one of {list(self._stac_obj.assets)}' ) from None storage_options = storage_options or {} if merge_asset_storage_options: asset_storage_options = asset.extra_fields.get('xarray:storage_options', {}) storage_options.update(asset_storage_options) if merge_asset_open_kwargs: asset_open_kwargs = asset.extra_fields.get('xarray:open_kwargs', {}) kwargs.update(asset_open_kwargs) return StacAsset(key, asset)(storage_options=storage_options, **kwargs)
[docs]class StacItemCollection(AbstractStacCatalog): """ Maps ItemCollection returned from a STAC API to Intake (Sub)Catalog https://github.com/radiantearth/stac-api-spec/tree/master/fragments/itemcollection Note search results often use the single file stac extension: https://pystac.readthedocs.io/en/latest/api.html?#single-file-stac-extension """ name = 'stac_itemcollection' _stac_cls = pystac.ItemCollection def _load(self): """ Load the STAC Item Collection. """ # if not self._stac_obj.ext.implements('single-file-stac'): # raise ValueError("StacItemCollection requires 'single-file-stac' extension") for item in self._stac_obj.items: self._entries[item.id] = LocalCatalogEntry( name=item.id, description='', driver=StacItem, catalog=self, args={'stac_obj': item}, ) def to_geopandas(self, crs=None): """ Load the STAC Item Collection into a geopandas GeoDataFrame Parameters ---------- crs : str or dict (optional) Coordinate reference system to set on the resulting frame. Returns ------- GeoDataFrame """ try: import geopandas as gpd except ImportError: raise ImportError( 'Using to_geopandas requires the `geopandas` package.' 'You can install it via Pip or Conda.' ) if crs is None: crs = 'epsg:4326' gf = gpd.GeoDataFrame.from_features(self._stac_obj.to_dict(), crs=crs) return gf
[docs]class StacItem(AbstractStacCatalog): """ Maps STAC Item to Intake (Sub)Catalog https://pystac.readthedocs.io/en/latest/api.html#item-spec """ name = 'stac_item' _stac_cls = pystac.Item def __getitem__(self, key): result = super().__getitem__(key) # TODO: handle non-string assets? asset = self._entries[key] storage_options = asset._stac_obj.extra_fields.get('xarray:storage_options', {}) open_kwargs = asset._stac_obj.extra_fields.get('xarray:open_kwargs', {}) if isinstance(result, DataSource): kwargs = result._captured_init_kwargs kwargs = {**kwargs, **dict(storage_options=storage_options), **open_kwargs} result = result(*result._captured_init_args, **kwargs) return result def _load(self): """ Load the STAC Item. """ for key, value in self._stac_obj.assets.items(): self._entries[key] = StacAsset(key, value) def _get_metadata(self, **kwargs): metadata = self._stac_obj.properties.copy() for attr in ['bbox', 'geometry', 'datetime', 'date']: metadata[attr] = getattr(self._stac_obj, attr, None) metadata.update(kwargs) return metadata def _get_band_info(self): """ Return list of band info dictionaries (name, common_name, etc.)... """ band_info = [] for band in EOExtension.ext(self._stac_obj).bands: band_info.append(band.to_dict()) return band_info def stack_bands(self, bands, path_as_pattern=None, concat_dim='band'): """ Stack the listed bands over the ``band`` dimension. This method only works for STAC Items using the 'eo' Extension https://github.com/radiantearth/stac-spec/tree/master/extensions/eo NOTE: This method is not aware of geotransform information. It *assumes* bands for a given STAC Item have the same coordinate reference system (CRS). This is usually the case for a given multi-band satellite acquisition. Coordinate alignment is performed automatically upon calling the `to_dask()` method to load into an Xarray DataArray if bands have diffent ground sample distance (gsd) or array shapes. Parameters ---------- bands : list of strings representing the different bands (e.g. ['B4', B5'], ['red', 'nir']). Returns ------- StacAsset with mapping of Asset names to Xarray bands Examples -------- stack = item.stack_bands(['nir','red']) da = stack(chunks=dict(band=1, x=2048, y=2048)).to_dask() stack = item.stack_bands(['B4','B5'], path_as_pattern='{band}.TIF') da = stack(chunks=dict(band=1, x=2048, y=2048)).to_dask() """ if not EOExtension.has_extension(self._stac_obj): raise ValueError('STAC Item must implement "eo" extension to use this method') band_info = self._get_band_info() configDict = {} metadatas = {} titles = [] hrefs = [] types = [] assets = self._stac_obj.assets for band in bands: # band can be band id, name or common_name if band in assets: info = next( (b for b in band_info if b.get('id', b.get('name')) == band), None, ) else: info = next((b for b in band_info if b.get('common_name') == band), None) if info is not None: band = info.get('id', info.get('name')) if band not in assets or info is None: valid_band_names = [] for b in band_info: valid_band_names.append(b.get('id', b.get('name'))) valid_band_names.append(b.get('common_name')) raise ValueError( f'{band} not found in list of eo:bands in collection.' f'Valid values: {sorted(list(set(valid_band_names)))}' ) asset = assets.get(band) metadatas[band] = asset.to_dict() titles.append(band) types.append(asset.media_type) hrefs.append(asset.href) unique_types = set(types) if len(unique_types) != 1: raise ValueError( f'Stacking failed: bands must have type, multiple found: {unique_types}' ) configDict['name'] = '_'.join(bands) configDict['description'] = ', '.join(titles) configDict['args'] = dict( chunks={}, concat_dim=concat_dim, path_as_pattern=path_as_pattern, urlpath=hrefs ) configDict['metadata'] = metadatas return CombinedAssets(configDict) def _yaml(self): data = {'metadata': {}, 'sources': {}} data['metadata'].update(self.metadata) for key, source in self.items(): data['sources'][key] = source._yaml()['sources']['stac_asset'] data['sources'][key]['direct_access'] = 'allow' data['sources'][key]['metadata'].pop('catalog_dir', None) return data
class StacAsset(LocalCatalogEntry): """ Maps 1 STAC Item Asset to 1 Intake Catalog Entry https://pystac.readthedocs.io/en/latest/api.html#asset """ name = 'stac_asset' _stac_cls = pystac.item.Asset def __init__(self, key, asset): """ Construct an Intake catalog 'Source' from a STAC Item Asset. asset = pystac.item.Asset """ self._stac_obj = asset driver = self._get_driver(asset) super().__init__( name=key, description=asset.title, driver=driver, direct_access='allow', args=self._get_args(asset, driver), metadata=self._get_metadata(asset), ) def _get_metadata(self, asset): """ Copy STAC Asset Metadata and setup default plot """ metadata = asset.to_dict() default_plot = self._get_plot(asset) if default_plot: metadata['plots'] = default_plot return metadata def _get_plot(self, asset): """ Default hvplot plot based on Asset mimetype """ # NOTE: consider geojson, parquet, hdf defaults in future... default_plot = None type = asset.media_type if type: if type in ['image/jpeg', 'image/jpg', 'image/png']: default_plot = dict( thumbnail=dict( kind='rgb', x='x', y='y', bands='channel', data_aspect=1, flip_yaxis=True, xaxis=False, yaxis=False, ) ) elif 'tiff' in type: default_plot = dict( geotiff=dict( kind='image', x='x', y='y', frame_width=500, data_aspect=1, rasterize=True, dynamic=True, cmap='viridis', ) ) return default_plot def _get_driver(self, asset): """ Assign intake driver for data I/O """ entry_type = asset.media_type if entry_type in ['', 'null', None]: suffix = '.tif' if asset.media_type: suffix = os.path.splitext(asset.media_type)[-1] if suffix in ['.nc', '.h5', '.hdf']: asset.media_type = 'application/netcdf' warnings.warn( f'STAC Asset "type" missing, assigning {entry_type} based on href suffix {suffix}:\n{asset.media_type}' # noqa: E501 ) else: asset.media_type = default_type warnings.warn( f'STAC Asset "type" missing, assuming default type={default_type}:\n{asset}' # noqa: E501 ) entry_type = asset.media_type # if mimetype not registered try rasterio driver driver = drivers.get(entry_type, default_driver) return driver def _get_args(self, asset, driver): """ Optional keyword arguments to pass to intake driver """ args = {'urlpath': asset.href} if driver in ['netcdf', 'rasterio', 'xarray_image']: # NOTE: force using dask? args.update(chunks={}) return args class CombinedAssets(LocalCatalogEntry): """ Maps multiple STAC Item Assets to 1 Intake Catalog Entry """ def __init__(self, configDict): """ configDict = intake Entry dictionary from stack_bands() method """ super().__init__( name=configDict['name'], description=configDict['description'], driver='rasterio', # stack_bands only relevant to rasterio driver? direct_access=True, args=configDict['args'], metadata=configDict['metadata'], )