Understanding Topological Sort
What is Topological Sort?
A linear ordering of vertices in a Directed Acyclic Graph (DAG) such that for every directed edge u → v, vertex u comes before v in the ordering.
- 📌 Only possible for DAGs (no cycles)
- 📊 Multiple valid orderings may exist
- 🔄 Used for dependency resolution
Real-World Examples
Course Prerequisites
Taking courses in a university where some courses require others as prerequisites.
Courses: [Math101, CS101, Math201, CS201, CS301]
Prerequisites:
- Math101 → Math201
- CS101 → CS201
- Math201 → CS301
- CS201 → CS301
Valid Order: Math101, CS101, Math201, CS201, CS301
DAG Visualization
Kahn's Algorithm (BFS-based)
Algorithm Steps
- Calculate in-degree for all vertices
- Add all vertices with in-degree 0 to a queue
- While queue is not empty:
- Remove vertex from queue and add to result
- Reduce in-degree of all neighbors
- If neighbor's in-degree becomes 0, add to queue
- If result contains all vertices, we have a valid ordering
Implementation
from collections import deque, defaultdict
def topological_sort_kahns(vertices, edges):
"""
Kahn's algorithm for topological sorting
Time: O(V + E), Space: O(V)
"""
# Build adjacency list and in-degree count
graph = defaultdict(list)
in_degree = {v: 0 for v in vertices}
for u, v in edges:
graph[u].append(v)
in_degree[v] += 1
# Initialize queue with vertices having 0 in-degree
queue = deque([v for v in vertices if in_degree[v] == 0])
result = []
while queue:
vertex = queue.popleft()
result.append(vertex)
# Reduce in-degree of neighbors
for neighbor in graph[vertex]:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
# Check if topological sort is possible
if len(result) != len(vertices):
return None # Cycle detected
return result
# Example usage
vertices = ['A', 'B', 'C', 'D', 'E']
edges = [('A', 'D'), ('B', 'D'), ('C', 'E'), ('D', 'E')]
print(topological_sort_kahns(vertices, edges))
# Output: ['A', 'B', 'C', 'D', 'E']
Cycle Detection
def has_cycle_kahns(vertices, edges):
"""
Detect cycle using Kahn's algorithm
Returns True if cycle exists
"""
graph = defaultdict(list)
in_degree = {v: 0 for v in vertices}
for u, v in edges:
graph[u].append(v)
in_degree[v] += 1
queue = deque([v for v in vertices if in_degree[v] == 0])
processed = 0
while queue:
vertex = queue.popleft()
processed += 1
for neighbor in graph[vertex]:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
return processed != len(vertices)
DFS-based Topological Sort
DFS Algorithm
Uses post-order traversal with a stack to build the topological order.
- 🔍 Visit each unvisited vertex
- 📚 Recursively visit all neighbors
- 🎯 Add vertex to stack after visiting all neighbors
- ↩️ Reverse the stack for final order
Implementation with Cycle Detection
def topological_sort_dfs(vertices, edges):
"""
DFS-based topological sorting
Time: O(V + E), Space: O(V)
"""
# Build adjacency list
graph = defaultdict(list)
for u, v in edges:
graph[u].append(v)
# Color states: WHITE (0) = unvisited, GRAY (1) = visiting, BLACK (2) = visited
color = {v: 0 for v in vertices}
stack = []
has_cycle = False
def dfs(vertex):
nonlocal has_cycle
if has_cycle:
return
color[vertex] = 1 # Mark as visiting (GRAY)
for neighbor in graph[vertex]:
if color[neighbor] == 1: # Back edge (cycle detected)
has_cycle = True
return
elif color[neighbor] == 0: # Unvisited
dfs(neighbor)
color[vertex] = 2 # Mark as visited (BLACK)
stack.append(vertex) # Add to stack in post-order
# Visit all vertices
for vertex in vertices:
if color[vertex] == 0:
dfs(vertex)
if has_cycle:
return None # Cycle detected
return stack[::-1] # Reverse for topological order
# Example with cycle detection
vertices = ['A', 'B', 'C', 'D']
edges = [('A', 'B'), ('B', 'C'), ('C', 'D')]
print(topological_sort_dfs(vertices, edges))
# Output: ['A', 'B', 'C', 'D']
# Example with cycle
edges_with_cycle = [('A', 'B'), ('B', 'C'), ('C', 'A')]
print(topological_sort_dfs(['A', 'B', 'C'], edges_with_cycle))
# Output: None (cycle detected)
All Topological Sorts
def all_topological_sorts(vertices, edges):
"""
Generate all possible topological sorts
"""
graph = defaultdict(list)
in_degree = {v: 0 for v in vertices}
for u, v in edges:
graph[u].append(v)
in_degree[v] += 1
all_sorts = []
def backtrack(current_sort, remaining):
if not remaining:
all_sorts.append(current_sort[:])
return
# Try each vertex with in-degree 0
for vertex in remaining:
if in_degree[vertex] == 0:
# Choose vertex
current_sort.append(vertex)
new_remaining = remaining - {vertex}
# Update in-degrees
for neighbor in graph[vertex]:
in_degree[neighbor] -= 1
# Recurse
backtrack(current_sort, new_remaining)
# Backtrack
for neighbor in graph[vertex]:
in_degree[neighbor] += 1
current_sort.pop()
backtrack([], set(vertices))
return all_sorts
# Example
vertices = ['A', 'B', 'C']
edges = [('A', 'C')]
print(all_topological_sorts(vertices, edges))
# Output: [['A', 'B', 'C'], ['B', 'A', 'C']]
Real-World Applications
1. Build System Dependencies
class BuildSystem:
def __init__(self):
self.tasks = {}
self.dependencies = defaultdict(list)
def add_task(self, task, deps=None):
"""Add a build task with dependencies"""
self.tasks[task] = deps or []
for dep in self.tasks[task]:
self.dependencies[dep].append(task)
def build_order(self):
"""Determine build order using topological sort"""
in_degree = {task: len(deps)
for task, deps in self.tasks.items()}
queue = deque([task for task, degree in in_degree.items()
if degree == 0])
order = []
while queue:
task = queue.popleft()
order.append(task)
for dependent in self.dependencies[task]:
in_degree[dependent] -= 1
if in_degree[dependent] == 0:
queue.append(dependent)
return order if len(order) == len(self.tasks) else None
# Example: Building a project
build = BuildSystem()
build.add_task('compile_utils', [])
build.add_task('compile_core', ['compile_utils'])
build.add_task('compile_app', ['compile_core'])
build.add_task('run_tests', ['compile_app'])
build.add_task('package', ['run_tests'])
print(build.build_order())
# Output: ['compile_utils', 'compile_core', 'compile_app', 'run_tests', 'package']
2. Course Schedule Problem
def can_finish_courses(num_courses, prerequisites):
"""
Determine if all courses can be completed
prerequisites[i] = [course, prerequisite]
"""
graph = defaultdict(list)
in_degree = [0] * num_courses
# Build graph
for course, prereq in prerequisites:
graph[prereq].append(course)
in_degree[course] += 1
# Start with courses having no prerequisites
queue = deque([i for i in range(num_courses)
if in_degree[i] == 0])
completed = 0
while queue:
course = queue.popleft()
completed += 1
for next_course in graph[course]:
in_degree[next_course] -= 1
if in_degree[next_course] == 0:
queue.append(next_course)
return completed == num_courses
def find_course_order(num_courses, prerequisites):
"""Find order to take all courses"""
graph = defaultdict(list)
in_degree = [0] * num_courses
for course, prereq in prerequisites:
graph[prereq].append(course)
in_degree[course] += 1
queue = deque([i for i in range(num_courses)
if in_degree[i] == 0])
order = []
while queue:
course = queue.popleft()
order.append(course)
for next_course in graph[course]:
in_degree[next_course] -= 1
if in_degree[next_course] == 0:
queue.append(next_course)
return order if len(order) == num_courses else []
# Example
num_courses = 4
prerequisites = [[1, 0], [2, 0], [3, 1], [3, 2]]
print(can_finish_courses(num_courses, prerequisites)) # True
print(find_course_order(num_courses, prerequisites)) # [0, 1, 2, 3]
3. Task Scheduling with Deadlines
def schedule_tasks_with_deadlines(tasks, dependencies):
"""
Schedule tasks considering dependencies and deadlines
tasks = [(name, duration, deadline), ...]
"""
# First, find valid topological order
task_names = [t[0] for t in tasks]
task_info = {t[0]: {'duration': t[1], 'deadline': t[2]}
for t in tasks}
graph = defaultdict(list)
in_degree = {name: 0 for name in task_names}
for prereq, task in dependencies:
graph[prereq].append(task)
in_degree[task] += 1
# Find topological order
queue = deque([t for t in task_names if in_degree[t] == 0])
schedule = []
current_time = 0
while queue:
# Among available tasks, choose one with earliest deadline
queue = deque(sorted(queue,
key=lambda x: task_info[x]['deadline']))
task = queue.popleft()
start_time = current_time
end_time = current_time + task_info[task]['duration']
schedule.append({
'task': task,
'start': start_time,
'end': end_time,
'deadline': task_info[task]['deadline'],
'on_time': end_time <= task_info[task]['deadline']
})
current_time = end_time
# Update available tasks
for next_task in graph[task]:
in_degree[next_task] -= 1
if in_degree[next_task] == 0:
queue.append(next_task)
return schedule
# Example
tasks = [
('Design', 3, 5),
('Implement', 5, 12),
('Test', 2, 15),
('Deploy', 1, 16)
]
dependencies = [
('Design', 'Implement'),
('Implement', 'Test'),
('Test', 'Deploy')
]
schedule = schedule_tasks_with_deadlines(tasks, dependencies)
for item in schedule:
print(f"{item['task']}: Day {item['start']}-{item['end']} "
f"(Deadline: Day {item['deadline']}, "
f"On time: {item['on_time']})")
Advanced Problems
1. Alien Dictionary
def alien_order(words):
"""
Derive alien alphabet order from sorted words
"""
# Build graph from character ordering
graph = defaultdict(set)
in_degree = {}
# Add all characters
for word in words:
for char in word:
if char not in in_degree:
in_degree[char] = 0
# Compare adjacent words
for i in range(len(words) - 1):
w1, w2 = words[i], words[i + 1]
min_len = min(len(w1), len(w2))
# Find first different character
for j in range(min_len):
if w1[j] != w2[j]:
if w2[j] not in graph[w1[j]]:
graph[w1[j]].add(w2[j])
in_degree[w2[j]] += 1
break
else:
# Check for invalid order (w1 is prefix but longer)
if len(w1) > len(w2):
return ""
# Topological sort using Kahn's algorithm
queue = deque([c for c in in_degree if in_degree[c] == 0])
result = []
while queue:
char = queue.popleft()
result.append(char)
for next_char in graph[char]:
in_degree[next_char] -= 1
if in_degree[next_char] == 0:
queue.append(next_char)
# Check if all characters are included
if len(result) != len(in_degree):
return "" # Invalid order (cycle exists)
return ''.join(result)
# Example
words = ["wrt", "wrf", "er", "ett", "rftt"]
print(alien_order(words)) # "wertf"
2. Parallel Courses
def minimum_semesters(n, relations):
"""
Find minimum number of semesters to take all courses
Can take multiple courses per semester if no dependencies
"""
graph = defaultdict(list)
in_degree = [0] * (n + 1)
for prereq, course in relations:
graph[prereq].append(course)
in_degree[course] += 1
queue = deque([i for i in range(1, n + 1)
if in_degree[i] == 0])
semesters = 0
studied = 0
while queue:
# Take all available courses this semester
semester_size = len(queue)
semesters += 1
for _ in range(semester_size):
course = queue.popleft()
studied += 1
for next_course in graph[course]:
in_degree[next_course] -= 1
if in_degree[next_course] == 0:
queue.append(next_course)
return semesters if studied == n else -1
# Example
n = 3
relations = [[1, 3], [2, 3]]
print(minimum_semesters(n, relations)) # 2 semesters
3. Sequence Reconstruction
def sequence_reconstruction(org, seqs):
"""
Check if org is the unique topological sort of seqs
"""
if not seqs:
return False
# Build graph
graph = defaultdict(set)
in_degree = defaultdict(int)
nodes = set()
for seq in seqs:
for num in seq:
nodes.add(num)
if num not in in_degree:
in_degree[num] = 0
for i in range(len(seq) - 1):
if seq[i + 1] not in graph[seq[i]]:
graph[seq[i]].add(seq[i + 1])
in_degree[seq[i + 1]] += 1
# Check if all elements in org are present
if len(nodes) != len(org) or nodes != set(org):
return False
# Topological sort
queue = deque([n for n in nodes if in_degree[n] == 0])
result = []
while queue:
# Must have exactly one choice at each step
if len(queue) != 1:
return False
num = queue.popleft()
result.append(num)
for next_num in graph[num]:
in_degree[next_num] -= 1
if in_degree[next_num] == 0:
queue.append(next_num)
return result == org
# Example
org = [1, 2, 3]
seqs = [[1, 2], [1, 3], [2, 3]]
print(sequence_reconstruction(org, seqs)) # True
Practice Exercises
💡 Key Points to Remember:
- Topological sort only works on DAGs (no cycles)
- Multiple valid orderings may exist
- Use Kahn's for counting/detecting cycles
- Use DFS for simple implementation
- Time complexity: O(V + E) for both approaches
Problem Set
Exercise 1: Verify DAG
Write a function to check if a directed graph is acyclic.
def is_dag(vertices, edges):
"""
Check if graph is a DAG
Your implementation here
"""
# Hint: Use either DFS with colors or Kahn's algorithm
pass
# Test cases
print(is_dag([1, 2, 3], [(1, 2), (2, 3)])) # True
print(is_dag([1, 2, 3], [(1, 2), (2, 3), (3, 1)])) # False
Exercise 2: Longest Path in DAG
Find the longest path in a DAG using topological sort.
def longest_path_dag(vertices, edges):
"""
Find longest path in DAG
"""
# Step 1: Topological sort
# Step 2: Process vertices in topological order
# Step 3: Update distances
graph = defaultdict(list)
in_degree = {v: 0 for v in vertices}
for u, v in edges:
graph[u].append(v)
in_degree[v] += 1
# Initialize distances
dist = {v: 0 for v in vertices}
# Topological sort
queue = deque([v for v in vertices if in_degree[v] == 0])
topo_order = []
while queue:
vertex = queue.popleft()
topo_order.append(vertex)
for neighbor in graph[vertex]:
# Update distance
dist[neighbor] = max(dist[neighbor], dist[vertex] + 1)
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
return max(dist.values())
# Test
vertices = ['A', 'B', 'C', 'D', 'E']
edges = [('A', 'B'), ('A', 'C'), ('B', 'D'), ('C', 'D'), ('D', 'E')]
print(longest_path_dag(vertices, edges)) # 3
Exercise 3: Critical Path Method
Find the critical path in a project network.
def critical_path(tasks):
"""
Find critical path in project network
tasks = [(name, duration, dependencies), ...]
"""
# Build graph
task_map = {t[0]: {'duration': t[1], 'deps': t[2]}
for t in tasks}
# Topological sort
in_degree = {name: len(info['deps'])
for name, info in task_map.items()}
queue = deque([name for name in in_degree if in_degree[name] == 0])
# Calculate earliest start times
earliest = {name: 0 for name in task_map}
topo_order = []
while queue:
task = queue.popleft()
topo_order.append(task)
# Update dependent tasks
for other_task, info in task_map.items():
if task in info['deps']:
earliest[other_task] = max(
earliest[other_task],
earliest[task] + task_map[task]['duration']
)
in_degree[other_task] -= 1
if in_degree[other_task] == 0:
queue.append(other_task)
# Calculate latest start times (backward pass)
project_duration = max(earliest[t] + task_map[t]['duration']
for t in topo_order)
latest = {name: project_duration for name in task_map}
for task in reversed(topo_order):
for dep_task in task_map[task]['deps']:
latest[dep_task] = min(
latest[dep_task],
latest[task] - task_map[dep_task]['duration']
)
# Find critical path (tasks where earliest == latest)
critical = [task for task in topo_order
if earliest[task] == latest[task]]
return critical, project_duration
# Example
tasks = [
('A', 3, []),
('B', 2, []),
('C', 4, ['A']),
('D', 3, ['A', 'B']),
('E', 2, ['C', 'D'])
]
critical, duration = critical_path(tasks)
print(f"Critical Path: {critical}")
print(f"Project Duration: {duration}")
Challenge Problems
| Problem | Difficulty | Key Technique |
|---|---|---|
| Course Schedule III | Hard | Topological Sort + Greedy |
| Find Eventual Safe States | Medium | Reverse Graph + Topological Sort |
| Reconstruct Itinerary | Medium | Modified DFS |