Homework 12: Scheduling with Dependencies 美国留学生作业assignment答案



About This Homework   关于这个家作

The homework consists of 4 questions, for a total of 90 points. 这份家作由4个问题组成,90个点。

The instructions for working on homework assignments are available on Canvas; as a summary:

  • Write your code only where indicated via

    If you write code in other places, it will be discarded during grading.

  • Do not add/remove cells.
  • The tests are implemented with assert statements: if they fail, you will see an error (a Python exception). If you see no error, you can assume that they pass.

Once you are done working on it, you can download the .ipynb and submit to this Google Form.


Assume you have to prepare Pasta Carbonara. My version of the recipe goes like this:

Dice onions and pancetta, and fry in a mix of olive oil and butter, slowly. Separately, put in a bowl as many eggs as there are dinner guests; you can either put in the bowls the yolks only, or you can add a few whites if you wish. Beat the eggs.
Bring water to a boil, and when it boils, salt it. Put the pasta in (I like Penne Rigate). When cooked, colander the water away, and quickly unite in the bowl the beaten eggs, the pasta, and the pancetta. Mix well and serve immediately.

If you have to invite people over, you could do this recipe sequentially, and first worry about cooking the pasta: warming the water, putting the pasta in, then colandering it. Then you could worry about cooking the pancetta and onions. When that’s done, you can start to beat the eggs. Finally, you could unite everything. Technically, that would work, but there would be two problems. The first is that, of course, the pasta would be rather cold by the time it would be served, a capital sin (pasta must be served immediately after it is cooked). Secondly, even if you rehash the order so that you first cook the pancetta, then beat the eggs, then cook the pasta, then technically this works — but it would take you well over one hour to have everything ready. You want to do things in parallel, cooking the pancetta while heating up the water for the pasta, and so forth. You want to discover what are the things that need to be done one after the other, and what are the things that can be done in parallel, and in which order to do everything.

Great cooking, by the way, is much about the perfect timing, not only the perfect preparation. You have to have the various preparations ready at the same time, to unite them just right. We will worry about timing in the second part of this chapter; first, we worry about what we can do and in which order.

As an aside for those of you who are more interested in compiling code than in cooking, the problem of how to compile C or C++ code is very similar. A makefile defines dependencies between tasks: you have to have compiled pathlib.c before you can link the result together with something else. The task of the make program is to figure out how to parallelize the compilation, so that independent tasks can happen in different processes (possibly on different CPU cores), while respecting the precedence constraints between tasks. We will mention this application in some of the exercises of the chapter.

Scheduling dependent tasks

We first disregard the problem of cooking (or compiling) time, and ask about the order in which we should be doing the tasks. We want to create a Scheduler object, that can tell us what to do at the same time. What operations should this object support?

  • add_task: we should be able to add a task, along with the task dependencies.
  • reset: indicating that we are about to run the sequences of tasks again.
  • available_tasks: this property should return the set of things that we can do in parallel.
  • mark_completed: used to notify the scheduler that we have completed a task. This should return the set of new tasks that we can do due to this task being completed; we can do these tasks in parallel alongside with the others that we are already doing.
  • all_done: returns True/False according to whether we have completed all tasks.

Choosing these operations is perhaps the most important step in the design of the scheduler. The operations need to have a simple, clear definition, and be useful in a concrete implementation of the service which will run the tasks. Of the above operations, they are all uncontroversial, except for the choice of behavior of completed. In theory, there is no need for completed to return the set of new tasks that can now be undertaken. If one remembers the set of tasks ?1T1 one can a do before a task ??1t∈T1 is completed, and marks ?t as completed, one can simply ask the scheduler for the set of tasks ?2T2 that can now be done, and add those in ?21?=?2({?}?1)T21t=T2∖({t}∪T1) for execution. However, we guess (as we have not yet written the task execution engine) that being told this set of tasks directly will simplify the design of the task execution engine.

Our scheduler class will be implemented in similar fashion to our graph class, with tasks corresponding to graph vertices, and dependencies represented as edges. The difference is that here, given a vertex (that is, a task) ?v, it will be useful to be able to access both:
  • the predecessors of ?v, that is, the tasks ?u that are declared as prerequisites of ?v, and
  • the successors of ?v, that is, the tasks ?u such that ?v was declared as a prerequisite for ?u.

When we add a task, we would have to initialize its set of successors and predecessors to empty. This is somewhat tedious, and so we resort to a defaultdict, which is a special type of dictionary such that, if the mapping for a key has not been defined, it returns a default value; in our case, an empty set. You can read more about defaultdict and related types here.

Our first implementation of the class is as follows. We let you complete the available_tasks and mark_completed methods.

from collections import defaultdict
import networkx as nx # Library for displaying graphs.
import matplotlib.pyplot as plt

class DependencyScheduler(object):

    def __init__(self):
        self.tasks = set()
        # The successors of a task are the tasks that depend on it, and can
        # only be done once the task is completed.
        self.successors = defaultdict(set)
        # The predecessors of a task have to be done before the task.
        self.predecessors = defaultdict(set)
        self.completed_tasks = set() # completed tasks

    def add_task(self, t, dependencies):
        """Adds a task t with given dependencies."""
        # Makes sure we know about all tasks mentioned.
        assert t not in self.tasks or len(self.predecessors[t]) == 0, "The task was already present."
        # The predecessors are the tasks that need to be done before.
        self.predecessors[t] = set(dependencies)
        # The new task is a successor of its dependencies.
        for u in dependencies:

    def reset(self):
        self.completed_tasks = set()

    def done(self):
        return self.completed_tasks == self.tasks

    def show(self):
        """We use the nx graph to display the graph."""
        g = nx.DiGraph()
        g.add_edges_from([(u, v) for u in self.tasks for v in self.successors[u]])
        node_colors = ''.join([('g' if v in self.completed_tasks else 'r')
                           for v in self.tasks])
        nx.draw(g, with_labels=True, node_color=node_colors)

    def uncompleted(self):
        """Returns the tasks that have not been completed.
        This is a property, so you can say scheduler.uncompleted rather than
        return self.tasks - self.completed_tasks

    def _check(self):
        """We check that if t is a successor of u, then u is a predecessor
        of t."""
        for u in self.tasks:
            for t in self.successors[u]:
                assert u in self.predecessors[t]

Question 1: implement available_tasks and mark_completed.

### Implementation of `available_tasks` and `mark_completed`.

def scheduler_available_tasks(self):
    """Returns the set of tasks that can be done in parallel.
    A task can be done if all its predecessors have been completed.
    And of course, we don't return any task that has already been

def scheduler_mark_completed(self, t):
    """Marks the task t as completed, and returns the additional
    set of tasks that can be done (and that could not be
    previously done) once t is completed."""

DependencyScheduler.available_tasks = property(scheduler_available_tasks)
DependencyScheduler.mark_completed = scheduler_mark_completed

Let us check if this works.

s = DependencyScheduler()
s.add_task('a', ['b', 'c'])
s.add_task('b', ['c', 'e'])

We note that in the above drawing, the edges denote temporal succession, that is, an edge from ?c to ?a means that ?c must happen before ?a. Let us execute the schedule manually.

Here are some tests for available_tasks and mark_completed.


### Simple tests. 5 points. 

s = DependencyScheduler()
s.add_task('a', [])
assert s.available_tasks == {'a'}

s = DependencyScheduler()
assert s.available_tasks == set()

### Slightly more complicated. 4 points. 

s = DependencyScheduler()
s.add_task('a', ['b', 'c'])
s.add_task('b', ['c', 'e'])
assert s.available_tasks == {'e', 'c'}

s = DependencyScheduler()
s.add_task('a', ['b'])
s.add_task('b', ['a'])
assert s.available_tasks == set()

### Now, let's test `mark_completed`.  Simple tests first. 2 points. 

s = DependencyScheduler()
s.add_task('a', [])
assert s.available_tasks, {'a'}
r = s.mark_completed('a')
assert r == set()

s = DependencyScheduler()
s.add_task('a', ['b'])
assert s.available_tasks == {'b'}
r = s.mark_completed('b')
assert r == {'a'}

### Slightly more complicated. 4 points. 

def assert_equal(a, b):
    assert a == b

s = DependencyScheduler()
s.add_task('a', ['b', 'c'])
assert_equal(s.available_tasks, {'b', 'c'})
r = s.mark_completed('b')
assert_equal(r, set())
assert_equal(s.available_tasks, {'c'})
r = s.mark_completed('c')
assert_equal(r, {'a'})

s = DependencyScheduler()
s.add_task('a', ['b', 'c'])
s.add_task('b', ['c', 'e'])
s.add_task('c', [])
assert_equal(s.available_tasks, {'c', 'e'})
r = s.mark_completed('e')
assert_equal(r, set())
r = s.mark_completed('c')
assert_equal(r, {'b'})
r = s.mark_completed('b')
assert_equal(r, {'a'})
r = s.mark_completed('a')
assert_equal(r, set())
assert_equal(s.available_tasks, set())

Executing the tasks

Here is an execution engine for our tasks with dependencies.

import random

def execute_schedule(s, show=False):
    in_process = s.available_tasks
    print("Starting by doing:", in_process)
    while len(in_process) > 0:
        # Picks one random task to be the first to be completed.
        t = random.choice(list(in_process))
        print("Completed:", t)
        in_process = in_process - {t} | s.mark_completed(t)
        print("Now doing:", in_process)
        if show:
    # Have we done all?
    if not s.done:
        print("Error, there are tasks that could not be completed:", s.uncompleted)
s = DependencyScheduler()
s.add_task('a', ['b', 'c'])
s.add_task('b', ['c', 'e'])