Source code for fileflow.storage_drivers.storage_driver

"""
.. module:: storage_drivers.storage_driver
    :synopsis: Abstraction of the file system for reading and writing task files.

.. moduleauthor:: David Barbarisi <dbarbarisi@industrydive.com>
"""


[docs]class StorageDriver(object): """ A base class for common functionality and API amongst the storage drivers. This is an example of mocking the read method inside a :py:class:`~fileflow.operators.DiveOperator` .. code-block:: python # Inside a TestCase where self.sensor is a custom sensor # Assign the desired input string to the return # value of the 'read' method attrs = { 'read.return_value': 'output from earlier task' } self.sensor.storage = Mock(**attrs) # Do stuff that will call storage.read() # Later, we want to make sure the `read` method has been # called the correct number of times self.assertEqual(1, self.sensor.storage.read.call_count) """
[docs] def get_filename(self, dag_id, task_id, execution_date): """ Return an identifying path or URL to the file related to an airflow task instance. Concrete storage drivers should implement this method. :param str dag_id: The airflow DAG ID. :param str task_id: The airflow task ID. :param datetime.datetime execution_date: The datetime for the task instance. :return: The identifying path or URL. :rtype: str """ raise NotImplementedError()
[docs] def get_path(self, dag_id, task_id): """ Return the path portion where files would be stored for the given task. This should generally be the same as get_filename except without the filename (execution date) portion. :param str dag_id: The DAG ID. :param str task_id: The task ID. :return: A path to the task's intermediate storage. :rtype: str """ raise NotImplementedError()
[docs] def read(self, dag_id, task_id, execution_date, encoding): """ Read the data output from the given airflow task instance. Concrete storage drivers should implement this method. :param str dag_id: The airflow DAG ID. :param str task_id: The airflow task ID. :param datetime.datetime execution_date: The datetime for the task instance. :param str encoding: The encoding to use for reading in the data. :return: The data from the file. :rtype: str """ raise NotImplementedError()
[docs] def get_read_stream(self, dag_id, task_id, execution_date): """ :param str dag_id: The airflow DAG ID. :param str task_id: The airflow task ID. :param datetime.datetime execution_date: The datetime for the task instance. :return: """ raise NotImplementedError()
[docs] def write(self, dag_id, task_id, execution_date, data, *args, **kwargs): """ Write data to the output file identified by the airflow task instance. Concrete storage drivers should implement this method. :param str dag_id: The airflow DAG ID. :param str task_id: The airflow task ID. :param datetime.datetime execution_date: The datetime for the task instance. :param str data: The data to write. :param args: might be used in child classes, (currently used in S3StorageDriver) :param kwargs: same reasoning as args """ raise NotImplementedError()
[docs] def write_from_stream(self, dag_id, task_id, execution_date, stream, *args, **kwargs): """ Write data to the output file identified by the airflow task instance. Concrete storage drivers should implement this method. :param str dag_id: The airflow DAG ID. :param str task_id: The airflow task ID. :param datetime.datetime execution_date: The datetime for the task instance. :param stream data: A stream to the data to write :param args: might be used in child classes, (currently used in S3StorageDriver) :param kwargs: same reasoning as args """ raise NotImplementedError()
[docs] def execution_date_string(self, execution_date): """ Format the execution date per our standard file naming convention. :param datetime.datetime execution_date: The airflow task instance execution date. :return: The formatted date string. :rtype: str """ return execution_date.strftime("%Y-%m-%d")
[docs] def list_filenames_in_path(self, path): """ Given a storage path, get all of the filenames of files directly in that path. Note that this only provides names of files and is not recursive. :param str path: The storage path to list. :return: A list of only the filename portion of filenames in the path. :rtype: list[str] """ raise NotImplementedError()
[docs] def list_filenames_in_task(self, dag_id, task_id): """ Shortcut method to get a list of filenames stored in the given task's path. This is already implemented by depending on the unimplemented methods above. Override this if you need more flexibility. :param str dag_id: The DAG ID of the task. :param str task_id: The task ID. :return: A list of file names of files stored by the task. :rtype: list[str] """ the_path = self.get_path(dag_id, task_id) return self.list_filenames_in_path(the_path)
[docs]class StorageDriverError(Exception): """ Base storage driver Exception. """ pass