Fileflow Overview

Fileflow is a collection of modules that support data transfer between Airflow tasks via file targets and dependencies with either a local file system or S3 backed storage mechanism. The concept is inherited from other pipelining systems such as Make, Drake, Pydoit, and Luigi that organize pipeline dependencies with file targets. In some ways this is an alternative to Airflow’s XCOM system, but supports arbitrarily large and arbitrarily formatted data for transfer whereas XCOM can only support a pickle of the size the backend database’s BLOB or BINARY LARGE OBJECT implementation can allow.

Installation

Fileflow has been tested on Python 2.7 and Airflow version 1.7.0.

You can install from github using pip:

pip install git+git://github.com/industrydive/fileflow.git#egg=fileflow

Or build from source by downloading the archive from github, unzipping, navigating to the root directory of the project and using setup.py:

python setup.py install

Concepts

The main components of fileflow are

  • an operator superclass for your operators and sensors that works with multi-inheritance (DiveOperator)
  • a task logic superclass that exposes the storage backend and convenience methods to serialize different data formats to text (TaskRunner)
  • two storage drivers that handle the nitty gritty of passing your serialized data to either the local file system or S3 (FileStorageDriver or S3StorageDriver)

DiveOperator

The DiveOperator is a subclass of airflow.models.BaseOperator that mixes in the basic functionality that allows operators to define which task’s data they depend on. You can subclass your own operators or sensors from DiveOperator exclusively, or mix it in via multi-inheritance with other existing operators to add the data dependency feature. For example, if you want to use the existing airflow.operators.PythonOperator and mix-in fileflow’s file targeting feature, you could define your derived class as:

class DerivedOperator(DiveOperator, PythonOperator):
    ...

Given that definition, you can specify a given task’s dependency on data output from an upstream task like so:

a_task = DerivedOperator(
        data_dependencies = {"dependency_key": "task_id_for_dependent_task"},
         ...
         )

Fileflow ships with exactly this type of derived operator, which we call DivePythonOperator, that works directly with the second component of fileflow, the TaskRunner, to make file detection of upstream targets easy.

TaskRunner

The TaskRunner is a superclass for you to use to define your task logic within your subclass’s run() method. Many of the Airflow examples set up the task logic as plain functions that the PythonOperator calls; however our DivePythonOperator instead expects the class name of a subclass of TaskRunner and during operator execution will call TaskRunner.run() which should contain the actual logic of the task. To be more clear, see this example comparing the basic PythonOperator signature and our DivePythonOperator signature:

# vanilla airflow PythonOperator
normal_task = PythonOperator(
                task_id="some_unique_task_id",
                python_callable=a_function_name,
                dag=dag)

# fancy DivePythonOperator
fancy_task = DivePythonOperator(
                task_id="some_other_unique_task_id",
                python_object=AClassNameThatSubclassesTaskRunnerAndHasARunMethod,
                data_dependencies={"important_data": normal_task.task_id},
                dag=dag)

If you’re a snowflake and don’t like calling your main logic wrapper run() and, for example, want to call it ninja_move(), you can configure that on the operator in the DAG:

# fancy DivePythonOperator for someone who wants to be unique
fancy_and_unique_task = DivePythonOperator(
                task_id="yet_aother_unique_task_id",
                python_object=AClassNameThatSubclassesTaskRunnerAndHasANinjaMoveMethod,
                python_method="ninja_move",
                data_dependencies={"important_data": normal_task.task_id},
                dag=dag)

All of this is to take advantage of the fact that we’ve done a bunch of work in TaskRunner to give it the ability to easily pass forward Airflow specific details to the storage driver to determine where it should write its target or where its upstream task’s wrote their targets. We’ve also written into TaskRunner several serialization methods that can serialize different file formats such as JSON, pandas DataFrames, and bytestreams for convenience. The idea is that by the time the TaskRunner has passed off some data to the appropriate storage driver, the data is already serialized into a single str representation or BytesIO object.

storage drivers

The two storage drivers shipped in fileflow deal with the nitty gritty of actually communicating with either the local file system in the case of FileStorageDriver, or with an S3 bucket in the case of S3StorageDriver. The storage driver needs to be able to

  • derive a path or key name or names from the Airflow TaskInstance context data passed through by the TaskRunner for either upstream tasks (data dependencies) or the current task’s target
  • read and write to that path or key name

Since we’re working with text I/O obviously this introduces a bunch of decisions the storage drivers have to be making regarding encoding/charsets, file read/write mode, path/key existence, and in the case of putting to S3 over HTTP, content types. All of this is handled by the respective storage driver; the interface for what a storage driver should implement is represented by the base StorageDriver class.

A full example

import logging
from datetime import datetime, timedelta

from airflow import DAG

from fileflow.operators import DivePythonOperator
from fileflow.task_runners import TaskRunner


# Define the logic for your tasks as classes that subclasses from TaskRunner
# By doing so it will have access to TaskRunner's convenience methods to read and write files

# Here's an easy one that just writes a file.
class TaskRunnerExample(TaskRunner):
    def run(self, *args, **kwargs):
        output_string = "This task -- called {} -- was run.".format(self.task_instance.task_id)
        self.write_file(output_string)
        logging.info("Wrote '{}' to '{}'".format(output_string, self.get_output_filename()))


# Here's a more complicated one that will read the file from its upstream task, do something, and then write its own file
# It also shows you how to override __init__ if you need to
class TaskRunnerReadExample(TaskRunner):
    def __init__(self, context):
        """
        An example how to write the init on a class derived from TaskRunner
        :param context: Required.
        """
        super(TaskRunnerReadExample, self).__init__(context)
        self.output_template = "Read '{}' from '{}'. Writing output to '{}'."

    def run(self, *args, **kwargs):
        # This is how you read the output of a previous task
        # The argument to read_upstream_file is based on the DAG configuration
        input_string = self.read_upstream_file("something")

        # An example bit of 'logic'
        output_string = self.output_template.format(
            input_string,
            self.get_input_filename("something"),
            self.get_output_filename()
        )

        # And write out the results of the logic to the correct file
        self.write_file(output_string)

        logging.info(output_string)


# Now let's define a DAG
dag = DAG(
    dag_id='fileflow_example',
    start_date=datetime(2030, 1, 1),
    schedule_interval=timedelta(minutes=1)
)

# The tasks in this DAG will use DivePythonOperator as the operator,
# which knows how to send a TaskRunner anything in the `data_dependencies` keyword
# so you can specify more than one file by name to be fed to a downstream task
t1 = DivePythonOperator(
    task_id="write_a_file",
    python_method="run",
    python_object=TaskRunnerExample,
    provide_context=True,
    owner="airflow",
    dag=dag
)

# We COULD set `python_method="run"` here as above, but "run" is the
# default value, so we're not bothering to set it
t2 = DivePythonOperator(
    task_id="read_that_file",
    python_object=TaskRunnerReadExample,
    data_dependencies={"something": t1.task_id},
    # remember how our TaskRunner subclass knows how to read the upstream file with the key 'something'? This is why
    provide_context=True,
    owner="airflow",
    dag=dag
)
(fileflow)fileflow $ airflow run -sd fileflow/example_dags/ fileflow_example write_a_file 2017-01-01
[2017-01-18 16:54:55,245] {__init__.py:36} INFO - Using executor SequentialExecutor
Sending to executor.
[2017-01-18 16:54:56,080] {__init__.py:36} INFO - Using executor SequentialExecutor
Logging into: /Users/llorenz/airflow/logs/fileflow_example/write_a_file/2017-01-01T00:00:00
[2017-01-18 16:54:56,995] {__init__.py:36} INFO - Using executor SequentialExecutor
(fileflow)fileflow $ airflow run -sd fileflow/example_dags/ fileflow_example read_that_file 2017-01-01
[2017-01-18 16:55:30,790] {__init__.py:36} INFO - Using executor SequentialExecutor
Sending to executor.
[2017-01-18 16:55:32,219] {__init__.py:36} INFO - Using executor SequentialExecutor
Logging into: /Users/llorenz/airflow/logs/fileflow_example/read_that_file/2017-01-01T00:00:00
(fileflow)fileflow $ tree storage/
storage/
└── fileflow_example
    ├── read_that_file
    │   └── 2017-01-01
    └── write_a_file
        └── 2017-01-01

3 directories, 2 files
(fileflow)fileflow $ cat storage/fileflow_example/read_that_file/2017-01-01
Read 'This task -- called write_a_file -- was run.' from 'storage/fileflow_example/write_a_file/2017-01-01'. Writing output to 'storage/fileflow_example/read_that_file/2017-01-01'.
(fileflow)fileflow $ cat storage/fileflow_example/write_a_file/2017-01-01
This task -- called write_a_file -- was run.
(fileflow)fileflow $