fileflow.storage_drivers package¶
Base StorageDriver class¶
-
class
fileflow.storage_drivers.storage_driver.StorageDriver[source]¶ Bases:
objectA base class for common functionality and API amongst the storage drivers.
This is an example of mocking the read method inside a
DiveOperator# Inside a TestCase where self.sensor is a custom sensor # Assign the desired input string to the return # value of the 'read' method attrs = { 'read.return_value': 'output from earlier task' } self.sensor.storage = Mock(**attrs) # Do stuff that will call storage.read() # Later, we want to make sure the `read` method has been # called the correct number of times self.assertEqual(1, self.sensor.storage.read.call_count)
-
execution_date_string(execution_date)[source]¶ Format the execution date per our standard file naming convention.
Parameters: execution_date (datetime.datetime) – The airflow task instance execution date. Returns: The formatted date string. Return type: str
-
get_filename(dag_id, task_id, execution_date)[source]¶ Return an identifying path or URL to the file related to an airflow task instance.
Concrete storage drivers should implement this method.
Parameters: - dag_id (str) – The airflow DAG ID.
- task_id (str) – The airflow task ID.
- execution_date (datetime.datetime) – The datetime for the task instance.
Returns: The identifying path or URL.
Return type:
-
get_path(dag_id, task_id)[source]¶ Return the path portion where files would be stored for the given task.
This should generally be the same as get_filename except without the filename (execution date) portion.
Parameters: Returns: A path to the task’s intermediate storage.
Return type:
-
get_read_stream(dag_id, task_id, execution_date)[source]¶ Parameters: - dag_id (str) – The airflow DAG ID.
- task_id (str) – The airflow task ID.
- execution_date (datetime.datetime) – The datetime for the task instance.
Returns:
-
list_filenames_in_path(path)[source]¶ Given a storage path, get all of the filenames of files directly in that path.
Note that this only provides names of files and is not recursive.
Parameters: path (str) – The storage path to list. Returns: A list of only the filename portion of filenames in the path. Return type: list[str]
-
list_filenames_in_task(dag_id, task_id)[source]¶ Shortcut method to get a list of filenames stored in the given task’s path.
This is already implemented by depending on the unimplemented methods above. Override this if you need more flexibility.
Parameters: Returns: A list of file names of files stored by the task.
Return type:
-
read(dag_id, task_id, execution_date, encoding)[source]¶ Read the data output from the given airflow task instance.
Concrete storage drivers should implement this method.
Parameters: - dag_id (str) – The airflow DAG ID.
- task_id (str) – The airflow task ID.
- execution_date (datetime.datetime) – The datetime for the task instance.
- encoding (str) – The encoding to use for reading in the data.
Returns: The data from the file.
Return type:
-
write(dag_id, task_id, execution_date, data, *args, **kwargs)[source]¶ Write data to the output file identified by the airflow task instance.
Concrete storage drivers should implement this method.
Parameters: - dag_id (str) – The airflow DAG ID.
- task_id (str) – The airflow task ID.
- execution_date (datetime.datetime) – The datetime for the task instance.
- data (str) – The data to write.
- args – might be used in child classes, (currently used in S3StorageDriver)
- kwargs – same reasoning as args
-
write_from_stream(dag_id, task_id, execution_date, stream, *args, **kwargs)[source]¶ Write data to the output file identified by the airflow task instance.
Concrete storage drivers should implement this method.
Parameters: - dag_id (str) – The airflow DAG ID.
- task_id (str) – The airflow task ID.
- execution_date (datetime.datetime) – The datetime for the task instance.
- data (stream) – A stream to the data to write
- args – might be used in child classes, (currently used in S3StorageDriver)
- kwargs – same reasoning as args
-
-
exception
fileflow.storage_drivers.storage_driver.StorageDriverError[source]¶ Bases:
exceptions.ExceptionBase storage driver Exception.
fileflow.storage_drivers.file_storage_driver module¶
-
class
fileflow.storage_drivers.file_storage_driver.FileStorageDriver(prefix)[source]¶ Bases:
fileflow.storage_drivers.storage_driver.StorageDriverRead and write to the local file system.
fileflow.storage_drivers.s3_storage_driver module¶
-
class
fileflow.storage_drivers.s3_storage_driver.S3StorageDriver(access_key_id, secret_access_key, bucket_name)[source]¶ Bases:
fileflow.storage_drivers.storage_driver.StorageDriverRead and write to S3.
-
get_key_name(dag_id, task_id, execution_date)[source]¶ Formats the S3 key name for the given task instance.
Parameters: - dag_id (str) – The airflow DAG ID.
- task_id (str) – The airflow task ID.
- execution_date (datetime.datetime) – The execution date of the task instance.
Returns: The S3 key name.
Return type:
-
get_or_create_key(key_name)[source]¶ Get a boto Key object with the given key name. If the key exists, returns that. Otherwise creates a new key.
Parameters: key_name (str) – The name of the S3 key. Returns: A boto key object. Return type: boto.s3.key.Key
-
list_filenames_in_path(path)[source]¶ This requires some special treatment. The path here is the full url path. For boto/s3 we need to strip out the s3 protocol and bucket name to only get the prefix.
Everywhere else in life the path is in the url form. This is done to mimic the full path in the file system storage driver.
-