As an intern at Cask this summer, I had the opportunity to work on Cask’s flagship product: CDAP. CDAP (Cask Data Application Platform) helps you build big-data applications on a layer of abstraction over a distributed environment. You can develop CDAP applications and test them locally on your computer, then package the application, and run it on a distributed environment. Basically, you can use Mapreduce, Spark, and take advantage of the resources of a cluster, while knowing almost nothing about a distributed environment. This summer, I implemented program status scheduling on the platform to allow users to trigger a programs based on the program status of another program, while passing event data between the program runs.

Use Case

There are many use cases for this feature. By introducing program dependencies, we can, in turn, have smaller programs that focus on completing one part of a greater data application process. For instance, we can build a CDAP pipeline to download a feed from FTP and perform some initial pre-processing, storing it into a CDAP dataset. If this process succeeds, then we would like to run a Spark job to process the dataset and output it to an S3 bucket.

With this feature, this process can be separated into two pipelines, where the Spark job processing pipeline can run after the FTP download completes, and only if the FTP download completes.

Designing

One of the unique aspects of my internship at Cask was the design process. Design played a very central role on this project, and on all projects at Cask. I spent a lot of time understanding the current process of understanding the lifecycle of programs on CDAP in both a standalone and distributed environment, as well as how programs can be scheduled in other ways (by time, or by the arrival of a new partition on a dataset).

After discussing the design with others, a core piece of the design was that all programs need to be notified of their program status. When saving the program status in storage, we should also see if there was a schedule created that depends on that program status. In other words, we were thinking of something like this:

The high level approach (highly simplified) is pretty straightforward: - A notification arrives. - Check in a queue if it satisfies a schedule. - Remove it from the queue, check if any constraints on that schedule are satisfied (time of day, concurrent runs, etc). - Start the program when those constraints are satisfied, if they’re satisfied.

A big part of the design was understanding the entire lifecycle of a program on our platform. All that work was consolidated into a diagram like the one below:

The Problems

But it wasn’t that simple. We didn’t have a way of sending notifications that a program was done. The design told us we needed to refactor first. The refactoring work served two purposes:

  1. We needed to unify how program states were recorded, so that program states accurately depict the correct state of the program. Remember that CDAP works both on a standalone computer and in a distributed environment - the lifecycle of programs is very different in both environments, and needed consistent states in all cases.

  2. We needed to use an internal transactional messaging system (think Kafka, but transactional/consistent, and durable in case services go down) to guarantee that these notifications would arrive exactly once, in order, and be visible to any internal part of CDAP. These notifications needed to be sent from anywhere in CDAP.

Improving program state recording:

Let me talk a bit about the refactoring work, which became a bit more involved than expected:

Unifying program states was tricky. We needed a separate way of recording program states in local mode and in distributed mode. Local mode was relatively easy. Each program has a runner to run the program, and a controller. We simply add a listener to each controller to look for state changes, and react to those. In distributed mode, however, it was inconsistent and too late to respond there. We needed to record program states in the Application Master in distributed mode, which is responsible for handing out containers/resources to the programs. This is the most accurate and reasonable place where the program state for any program is known.

Then, using a transactional messaging system to record program states was also more difficult than we thought, for other reasons. Publishing messages was easy, but consuming them efficiently was a bit more difficult. We needed to built a message fetcher that would consume notifications in batch and persist the program states transactionally. Once we did all that, we needed to fix all of the tests to correctly check for the program status, as these notifications were a cleaner, more reliable, but slightly slower way of updating program states.

With notifications being published and consumed internally by the platform, it became much easier to implement the original goal and follow the diagram: scheduling programs.

At this point, a thread can subscribe to the topic on the messaging system and persist program state changes to the store. Scheduling can also subscribe to the same topic and schedule jobs.

Conclusion

I learned a lot at Cask, about all of the buzz words around big data, about design, about open source software. Dealing with a distributed environment with multi-threaded code and handling race conditions made this a much more challenging internship than, say, classic web-development. I definitely have a greater appreciation for Java now.

Check out my design docs I made:

  1. Program status scheduling: https://wiki.cask.co/display/CE/Program+Status+Based+Scheduling
  2. Refining program states: https://wiki.cask.co/display/CE/Refined+Program+States