Python asyncio.gather(): Run Concurrent Tasks (Complete Guide)
asyncio.gather() is one of the most useful functions in Python's async toolkit. It lets you run multiple async functions concurrently and wait for all of them to finish. This guide explains how it works, when to use it, and how to handle errors.
What is asyncio.gather()?
asyncio.gather() is a function that:
- Takes multiple coroutines (async functions) as input
- Runs them concurrently (not sequentially)
- Waits for all of them to complete
- Returns their results in order
Synchronous vs Asynchronous Example
Without asyncio.gather() (Sequential)
import time
def fetch_data(url):
time.sleep(1) # Simulates network request
return f"Data from {url}"
# Sequential: takes 3 seconds
start = time.time()
result1 = fetch_data("api.example.com/user/1") # 1 second
result2 = fetch_data("api.example.com/user/2") # 1 second
result3 = fetch_data("api.example.com/user/3") # 1 second
print(f"Total time: {time.time() - start}s") # 3 seconds
With asyncio.gather() (Concurrent)
import asyncio
async def fetch_data(url):
await asyncio.sleep(1) # Simulates network request
return f"Data from {url}"
async def main():
# Concurrent: takes ~1 second
results = await asyncio.gather(
fetch_data("api.example.com/user/1"),
fetch_data("api.example.com/user/2"),
fetch_data("api.example.com/user/3"),
)
return results
# Run it
import time
start = time.time()
results = asyncio.run(main())
print(f"Total time: {time.time() - start}s") # ~1 second (much faster!)
print(results) # ['Data from api.example.com/user/1', ...]
The difference: Sequential took 3 seconds. Concurrent took ~1 second. That's 3x faster!
How asyncio.gather() Works
gather(coro1, coro2, coro3)
ā ā ā
Start Start Start
(concurrent, not sequential)
ā ā ā
Wait for all 3 to complete
ā
Return [result1, result2, result3] in order
Basic Syntax
results = await asyncio.gather(
coroutine1,
coroutine2,
coroutine3,
)
Returns: A list of results in the same order you passed them.
Real-World Examples
Example 1: Fetch Data from Multiple APIs
import asyncio
import aiohttp
async def fetch_user(session, user_id):
async with session.get(f'https://api.example.com/users/{user_id}') as response:
return await response.json()
async def main():
async with aiohttp.ClientSession() as session:
# Fetch 3 users concurrently
users = await asyncio.gather(
fetch_user(session, 1),
fetch_user(session, 2),
fetch_user(session, 3),
)
return users
# Run it
users = asyncio.run(main())
print(users) # [user1_data, user2_data, user3_data]
Why this is fast: Instead of waiting 3 seconds (1 second per request), all 3 requests happen in parallel in ~1 second.
Example 2: Process Multiple Files
import asyncio
async def process_file(filename):
# Simulate file reading
await asyncio.sleep(1)
return f"Processed {filename}"
async def main():
files = ['file1.txt', 'file2.txt', 'file3.txt']
# Process all files concurrently
results = await asyncio.gather(
*[process_file(f) for f in files]
)
return results
results = asyncio.run(main())
print(results)
# ['Processed file1.txt', 'Processed file2.txt', 'Processed file3.txt']
Note the * unpacking - it spreads the list into individual arguments.
Example 3: Fetch from List of URLs
import asyncio
import aiohttp
async def fetch_url(session, url):
async with session.get(url) as response:
return {
'url': url,
'status': response.status,
'length': len(await response.text())
}
async def main():
urls = [
'https://google.com',
'https://github.com',
'https://stackoverflow.com',
]
async with aiohttp.ClientSession() as session:
results = await asyncio.gather(
*[fetch_url(session, url) for url in urls]
)
return results
results = asyncio.run(main())
for result in results:
print(f"{result['url']}: {result['status']}")
Results Order is Preserved
asyncio.gather() returns results in the same order you passed the coroutines, even if they finish in a different order.
async def task(name, delay):
await asyncio.sleep(delay)
print(f"Task {name} done")
return name
async def main():
results = await asyncio.gather(
task("A", 3), # Takes 3 seconds
task("B", 1), # Takes 1 second
task("C", 2), # Takes 2 seconds
)
return results
results = asyncio.run(main())
print(results) # ['A', 'B', 'C'] - in order, despite C finishing before A
Output:
Task B done
Task C done
Task A done
Total results: ['A', 'B', 'C'] ā Order preserved!
Handling Errors in asyncio.gather()
Default Behavior: First Error Cancels All
By default, if one task raises an exception, gather() cancels the remaining tasks.
async def failing_task():
await asyncio.sleep(1)
raise ValueError("This task failed!")
async def normal_task():
await asyncio.sleep(2)
return "Success"
async def main():
try:
results = await asyncio.gather(
failing_task(),
normal_task(),
)
except ValueError as e:
print(f"Caught error: {e}")
# normal_task() is cancelled
asyncio.run(main())
Collect All Errors: Use return_exceptions=True
async def failing_task():
await asyncio.sleep(1)
raise ValueError("This task failed!")
async def normal_task():
await asyncio.sleep(2)
return "Success"
async def main():
results = await asyncio.gather(
failing_task(),
normal_task(),
return_exceptions=True ā KEY DIFFERENCE
)
# Results now contains both successes AND exceptions
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
else:
print(f"Task {i} succeeded: {result}")
asyncio.run(main())
Output:
Task 0 failed: This task failed!
Task 1 succeeded: Success
Use return_exceptions=True when: You want all tasks to complete even if some fail.
Comparing asyncio Functions
asyncio.gather() vs asyncio.wait()
| Feature | gather() | wait() |
|---|---|---|
| Simple usage | ā Easier | ā More verbose |
| Get results | ā Automatic | ā Manual extraction |
| Return value | List of results | Set of done/pending tasks |
| Timeout | ā No direct timeout | ā Built-in timeout parameter |
| Cancel all on error | ā By default | ā No automatic cancellation |
Use gather(): Most cases, simpler code
Use wait(): When you need fine-grained control over timeout or completion mode
asyncio.gather() vs asyncio.create_task()
# gather() - simpler
results = await asyncio.gather(task1(), task2(), task3())
# create_task() - more control
t1 = asyncio.create_task(task1())
t2 = asyncio.create_task(task2())
t3 = asyncio.create_task(task3())
results = [await t1, await t2, await t3]
Use gather() for simplicity. Use create_task() when you need to store task references for later cancellation.
Advanced: Limiting Concurrent Tasks
What if you have 1000 URLs to fetch but don't want to create 1000 concurrent connections?
import asyncio
import aiohttp
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()
async def bounded_gather(coros, limit=10):
"""Run coroutines with a concurrency limit"""
semaphore = asyncio.Semaphore(limit)
async def bounded_coro(coro):
async with semaphore:
return await coro
return await asyncio.gather(
*[bounded_coro(coro) for coro in coros]
)
async def main():
urls = [f"https://example.com/{i}" for i in range(100)]
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await bounded_gather(tasks, limit=10) # Max 10 concurrent
return results
Why use this: Prevents overwhelming the server or hitting rate limits. 10 concurrent connections is usually safer than 100.
Performance Comparison
Fetching 10 URLs:
Sequential (no asyncio): 10 seconds
asyncio.gather() (concurrent): ~1 second
Speed improvement: 10x faster!
The more I/O-bound operations you have, the more you benefit from gather().
Common Mistakes
ā Mistake 1: Forgetting to await gather()
# WRONG
results = asyncio.gather(task1(), task2()) # Missing await!
print(results) # Prints coroutine object, not results
# CORRECT
results = await asyncio.gather(task1(), task2())
ā Mistake 2: Mixing synchronous and async code
# WRONG
async def main():
result1 = fetch_data() # Oops, forgot await!
result2 = await fetch_data()
# CORRECT
async def main():
result1 = await fetch_data()
result2 = await fetch_data()
ā Mistake 3: Not handling exceptions with return_exceptions=True
# WRONG - one failed task crashes everything
results = await asyncio.gather(
fetch_user(1),
fetch_user(2),
fetch_user(3),
) # If one fails, no partial results!
# CORRECT
results = await asyncio.gather(
fetch_user(1),
fetch_user(2),
fetch_user(3),
return_exceptions=True
)
Best Practices
- Use gather() for simple cases - Clean, readable code
- Add return_exceptions=True - Resilient to failures
- Limit concurrency - Use semaphores for large task counts
- Set timeouts - Wrap with asyncio.wait_for() for safety
- Log task completion - Use logging to debug concurrent issues
- Test failure scenarios - What happens if a task hangs?
Timeout Example
async def main():
try:
results = await asyncio.wait_for(
asyncio.gather(
fetch_data(1),
fetch_data(2),
fetch_data(3),
),
timeout=5.0 # Wait max 5 seconds
)
except asyncio.TimeoutError:
print("One or more tasks took too long")
Conclusion
asyncio.gather() is the go-to tool for running multiple async operations concurrently in Python:
- 3x-10x faster than sequential execution
- Simple API - pass coroutines, get results in order
- Error handling - use
return_exceptions=Truefor resilience - Scalable - use semaphores to limit concurrency
Master asyncio.gather() and you've mastered concurrent Python programming.