fileflow.storage_drivers package

Base StorageDriver class

class fileflow.storage_drivers.storage_driver.StorageDriver[source]

Bases: object

A base class for common functionality and API amongst the storage drivers.

This is an example of mocking the read method inside a DiveOperator

# Inside a TestCase where self.sensor is a custom sensor

# Assign the desired input string to the return
# value of the 'read' method
attrs = {
     'read.return_value': 'output from earlier task'
}
self.sensor.storage = Mock(**attrs)

# Do stuff that will call storage.read()

# Later, we want to make sure the `read` method has been
# called the correct number of times
self.assertEqual(1, self.sensor.storage.read.call_count)
execution_date_string(execution_date)[source]

Format the execution date per our standard file naming convention.

Parameters:execution_date (datetime.datetime) – The airflow task instance execution date.
Returns:The formatted date string.
Return type:str
get_filename(dag_id, task_id, execution_date)[source]

Return an identifying path or URL to the file related to an airflow task instance.

Concrete storage drivers should implement this method.

Parameters:
  • dag_id (str) – The airflow DAG ID.
  • task_id (str) – The airflow task ID.
  • execution_date (datetime.datetime) – The datetime for the task instance.
Returns:

The identifying path or URL.

Return type:

str

get_path(dag_id, task_id)[source]

Return the path portion where files would be stored for the given task.

This should generally be the same as get_filename except without the filename (execution date) portion.

Parameters:
  • dag_id (str) – The DAG ID.
  • task_id (str) – The task ID.
Returns:

A path to the task’s intermediate storage.

Return type:

str

get_read_stream(dag_id, task_id, execution_date)[source]
Parameters:
  • dag_id (str) – The airflow DAG ID.
  • task_id (str) – The airflow task ID.
  • execution_date (datetime.datetime) – The datetime for the task instance.
Returns:

list_filenames_in_path(path)[source]

Given a storage path, get all of the filenames of files directly in that path.

Note that this only provides names of files and is not recursive.

Parameters:path (str) – The storage path to list.
Returns:A list of only the filename portion of filenames in the path.
Return type:list[str]
list_filenames_in_task(dag_id, task_id)[source]

Shortcut method to get a list of filenames stored in the given task’s path.

This is already implemented by depending on the unimplemented methods above. Override this if you need more flexibility.

Parameters:
  • dag_id (str) – The DAG ID of the task.
  • task_id (str) – The task ID.
Returns:

A list of file names of files stored by the task.

Return type:

list[str]

read(dag_id, task_id, execution_date, encoding)[source]

Read the data output from the given airflow task instance.

Concrete storage drivers should implement this method.

Parameters:
  • dag_id (str) – The airflow DAG ID.
  • task_id (str) – The airflow task ID.
  • execution_date (datetime.datetime) – The datetime for the task instance.
  • encoding (str) – The encoding to use for reading in the data.
Returns:

The data from the file.

Return type:

str

write(dag_id, task_id, execution_date, data, *args, **kwargs)[source]

Write data to the output file identified by the airflow task instance.

Concrete storage drivers should implement this method.

Parameters:
  • dag_id (str) – The airflow DAG ID.
  • task_id (str) – The airflow task ID.
  • execution_date (datetime.datetime) – The datetime for the task instance.
  • data (str) – The data to write.
  • args – might be used in child classes, (currently used in S3StorageDriver)
  • kwargs – same reasoning as args
write_from_stream(dag_id, task_id, execution_date, stream, *args, **kwargs)[source]

Write data to the output file identified by the airflow task instance.

Concrete storage drivers should implement this method.

Parameters:
  • dag_id (str) – The airflow DAG ID.
  • task_id (str) – The airflow task ID.
  • execution_date (datetime.datetime) – The datetime for the task instance.
  • data (stream) – A stream to the data to write
  • args – might be used in child classes, (currently used in S3StorageDriver)
  • kwargs – same reasoning as args
exception fileflow.storage_drivers.storage_driver.StorageDriverError[source]

Bases: exceptions.Exception

Base storage driver Exception.

fileflow.storage_drivers.file_storage_driver module

class fileflow.storage_drivers.file_storage_driver.FileStorageDriver(prefix)[source]

Bases: fileflow.storage_drivers.storage_driver.StorageDriver

Read and write to the local file system.

check_or_create_dir(dir)[source]

Make sure our storage location exists.

Parameters:dir (str) – The directory name to look for and create if it doesn’t exist..
Returns:
get_filename(dag_id, task_id, execution_date)[source]
get_path(dag_id, task_id)[source]
get_read_stream(dag_id, task_id, execution_date)[source]
list_filenames_in_path(path)[source]
read(dag_id, task_id, execution_date, encoding='utf-8')[source]
write(dag_id, task_id, execution_date, data, *args, **kwargs)[source]
write_from_stream(dag_id, task_id, execution_date, stream, *args, **kwargs)[source]

fileflow.storage_drivers.s3_storage_driver module

class fileflow.storage_drivers.s3_storage_driver.S3StorageDriver(access_key_id, secret_access_key, bucket_name)[source]

Bases: fileflow.storage_drivers.storage_driver.StorageDriver

Read and write to S3.

get_filename(dag_id, task_id, execution_date)[source]
get_key_name(dag_id, task_id, execution_date)[source]

Formats the S3 key name for the given task instance.

Parameters:
  • dag_id (str) – The airflow DAG ID.
  • task_id (str) – The airflow task ID.
  • execution_date (datetime.datetime) – The execution date of the task instance.
Returns:

The S3 key name.

Return type:

str

get_or_create_key(key_name)[source]

Get a boto Key object with the given key name. If the key exists, returns that. Otherwise creates a new key.

Parameters:key_name (str) – The name of the S3 key.
Returns:A boto key object.
Return type:boto.s3.key.Key
get_path(dag_id, task_id)[source]
get_read_stream(dag_id, task_id, execution_date)[source]
list_filenames_in_path(path)[source]

This requires some special treatment. The path here is the full url path. For boto/s3 we need to strip out the s3 protocol and bucket name to only get the prefix.

Everywhere else in life the path is in the url form. This is done to mimic the full path in the file system storage driver.

read(dag_id, task_id, execution_date, encoding='utf-8')[source]
write(dag_id, task_id, execution_date, data, content_type='text/plain', *args, **kwargs)[source]

Note that content_type is an argument not in parent method.

Parameters:content_type (string) – The content-type. If set to None, it is not set.
write_from_stream(dag_id, task_id, execution_date, stream, content_type='text/plain', *args, **kwargs)[source]
Parameters:content_type (string|None) – pass None to not set