Skip to main content
Back to Articles

Python asyncio.gather(): Run Concurrent Tasks (Complete Guide)

Learn how to run multiple async functions concurrently with Python asyncio.gather(). Includes practical examples, error handling, and performance comparisons.

March 1, 202618 min readBy Mathematicon

Python asyncio.gather(): Run Concurrent Tasks (Complete Guide)

asyncio.gather() is one of the most useful functions in Python's async toolkit. It lets you run multiple async functions concurrently and wait for all of them to finish. This guide explains how it works, when to use it, and how to handle errors.

What is asyncio.gather()?

asyncio.gather() is a function that:

  • Takes multiple coroutines (async functions) as input
  • Runs them concurrently (not sequentially)
  • Waits for all of them to complete
  • Returns their results in order

Synchronous vs Asynchronous Example

Without asyncio.gather() (Sequential)

import time

def fetch_data(url):
    time.sleep(1)  # Simulates network request
    return f"Data from {url}"

# Sequential: takes 3 seconds
start = time.time()
result1 = fetch_data("api.example.com/user/1")  # 1 second
result2 = fetch_data("api.example.com/user/2")  # 1 second
result3 = fetch_data("api.example.com/user/3")  # 1 second
print(f"Total time: {time.time() - start}s")  # 3 seconds

With asyncio.gather() (Concurrent)

import asyncio

async def fetch_data(url):
    await asyncio.sleep(1)  # Simulates network request
    return f"Data from {url}"

async def main():
    # Concurrent: takes ~1 second
    results = await asyncio.gather(
        fetch_data("api.example.com/user/1"),
        fetch_data("api.example.com/user/2"),
        fetch_data("api.example.com/user/3"),
    )
    return results

# Run it
import time
start = time.time()
results = asyncio.run(main())
print(f"Total time: {time.time() - start}s")  # ~1 second (much faster!)
print(results)  # ['Data from api.example.com/user/1', ...]

The difference: Sequential took 3 seconds. Concurrent took ~1 second. That's 3x faster!

How asyncio.gather() Works

gather(coro1, coro2, coro3)
       ↓         ↓        ↓
    Start      Start     Start
    (concurrent, not sequential)
       ↓         ↓        ↓
    Wait for all 3 to complete
       ↓
    Return [result1, result2, result3] in order

Basic Syntax

results = await asyncio.gather(
    coroutine1,
    coroutine2,
    coroutine3,
)

Returns: A list of results in the same order you passed them.

Real-World Examples

Example 1: Fetch Data from Multiple APIs

import asyncio
import aiohttp

async def fetch_user(session, user_id):
    async with session.get(f'https://api.example.com/users/{user_id}') as response:
        return await response.json()

async def main():
    async with aiohttp.ClientSession() as session:
        # Fetch 3 users concurrently
        users = await asyncio.gather(
            fetch_user(session, 1),
            fetch_user(session, 2),
            fetch_user(session, 3),
        )
        return users

# Run it
users = asyncio.run(main())
print(users)  # [user1_data, user2_data, user3_data]

Why this is fast: Instead of waiting 3 seconds (1 second per request), all 3 requests happen in parallel in ~1 second.

Example 2: Process Multiple Files

import asyncio

async def process_file(filename):
    # Simulate file reading
    await asyncio.sleep(1)
    return f"Processed {filename}"

async def main():
    files = ['file1.txt', 'file2.txt', 'file3.txt']

    # Process all files concurrently
    results = await asyncio.gather(
        *[process_file(f) for f in files]
    )
    return results

results = asyncio.run(main())
print(results)
# ['Processed file1.txt', 'Processed file2.txt', 'Processed file3.txt']

Note the * unpacking - it spreads the list into individual arguments.

Example 3: Fetch from List of URLs

import asyncio
import aiohttp

async def fetch_url(session, url):
    async with session.get(url) as response:
        return {
            'url': url,
            'status': response.status,
            'length': len(await response.text())
        }

async def main():
    urls = [
        'https://google.com',
        'https://github.com',
        'https://stackoverflow.com',
    ]

    async with aiohttp.ClientSession() as session:
        results = await asyncio.gather(
            *[fetch_url(session, url) for url in urls]
        )
    return results

results = asyncio.run(main())
for result in results:
    print(f"{result['url']}: {result['status']}")

Results Order is Preserved

asyncio.gather() returns results in the same order you passed the coroutines, even if they finish in a different order.

async def task(name, delay):
    await asyncio.sleep(delay)
    print(f"Task {name} done")
    return name

async def main():
    results = await asyncio.gather(
        task("A", 3),  # Takes 3 seconds
        task("B", 1),  # Takes 1 second
        task("C", 2),  # Takes 2 seconds
    )
    return results

results = asyncio.run(main())
print(results)  # ['A', 'B', 'C'] - in order, despite C finishing before A

Output:

Task B done
Task C done
Task A done
Total results: ['A', 'B', 'C']  ← Order preserved!

Handling Errors in asyncio.gather()

Default Behavior: First Error Cancels All

By default, if one task raises an exception, gather() cancels the remaining tasks.

async def failing_task():
    await asyncio.sleep(1)
    raise ValueError("This task failed!")

async def normal_task():
    await asyncio.sleep(2)
    return "Success"

async def main():
    try:
        results = await asyncio.gather(
            failing_task(),
            normal_task(),
        )
    except ValueError as e:
        print(f"Caught error: {e}")
        # normal_task() is cancelled

asyncio.run(main())

Collect All Errors: Use return_exceptions=True

async def failing_task():
    await asyncio.sleep(1)
    raise ValueError("This task failed!")

async def normal_task():
    await asyncio.sleep(2)
    return "Success"

async def main():
    results = await asyncio.gather(
        failing_task(),
        normal_task(),
        return_exceptions=True  ← KEY DIFFERENCE
    )

    # Results now contains both successes AND exceptions
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Task {i} failed: {result}")
        else:
            print(f"Task {i} succeeded: {result}")

asyncio.run(main())

Output:

Task 0 failed: This task failed!
Task 1 succeeded: Success

Use return_exceptions=True when: You want all tasks to complete even if some fail.

Comparing asyncio Functions

asyncio.gather() vs asyncio.wait()

Feature gather() wait()
Simple usage āœ… Easier āŒ More verbose
Get results āœ… Automatic āŒ Manual extraction
Return value List of results Set of done/pending tasks
Timeout āŒ No direct timeout āœ… Built-in timeout parameter
Cancel all on error āœ… By default āŒ No automatic cancellation

Use gather(): Most cases, simpler code Use wait(): When you need fine-grained control over timeout or completion mode

asyncio.gather() vs asyncio.create_task()

# gather() - simpler
results = await asyncio.gather(task1(), task2(), task3())

# create_task() - more control
t1 = asyncio.create_task(task1())
t2 = asyncio.create_task(task2())
t3 = asyncio.create_task(task3())
results = [await t1, await t2, await t3]

Use gather() for simplicity. Use create_task() when you need to store task references for later cancellation.

Advanced: Limiting Concurrent Tasks

What if you have 1000 URLs to fetch but don't want to create 1000 concurrent connections?

import asyncio
import aiohttp

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()

async def bounded_gather(coros, limit=10):
    """Run coroutines with a concurrency limit"""
    semaphore = asyncio.Semaphore(limit)

    async def bounded_coro(coro):
        async with semaphore:
            return await coro

    return await asyncio.gather(
        *[bounded_coro(coro) for coro in coros]
    )

async def main():
    urls = [f"https://example.com/{i}" for i in range(100)]

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await bounded_gather(tasks, limit=10)  # Max 10 concurrent

    return results

Why use this: Prevents overwhelming the server or hitting rate limits. 10 concurrent connections is usually safer than 100.

Performance Comparison

Fetching 10 URLs:

Sequential (no asyncio):      10 seconds
asyncio.gather() (concurrent): ~1 second
Speed improvement: 10x faster!

The more I/O-bound operations you have, the more you benefit from gather().

Common Mistakes

āŒ Mistake 1: Forgetting to await gather()

# WRONG
results = asyncio.gather(task1(), task2())  # Missing await!
print(results)  # Prints coroutine object, not results

# CORRECT
results = await asyncio.gather(task1(), task2())

āŒ Mistake 2: Mixing synchronous and async code

# WRONG
async def main():
    result1 = fetch_data()  # Oops, forgot await!
    result2 = await fetch_data()

# CORRECT
async def main():
    result1 = await fetch_data()
    result2 = await fetch_data()

āŒ Mistake 3: Not handling exceptions with return_exceptions=True

# WRONG - one failed task crashes everything
results = await asyncio.gather(
    fetch_user(1),
    fetch_user(2),
    fetch_user(3),
)  # If one fails, no partial results!

# CORRECT
results = await asyncio.gather(
    fetch_user(1),
    fetch_user(2),
    fetch_user(3),
    return_exceptions=True
)

Best Practices

  1. Use gather() for simple cases - Clean, readable code
  2. Add return_exceptions=True - Resilient to failures
  3. Limit concurrency - Use semaphores for large task counts
  4. Set timeouts - Wrap with asyncio.wait_for() for safety
  5. Log task completion - Use logging to debug concurrent issues
  6. Test failure scenarios - What happens if a task hangs?

Timeout Example

async def main():
    try:
        results = await asyncio.wait_for(
            asyncio.gather(
                fetch_data(1),
                fetch_data(2),
                fetch_data(3),
            ),
            timeout=5.0  # Wait max 5 seconds
        )
    except asyncio.TimeoutError:
        print("One or more tasks took too long")

Conclusion

asyncio.gather() is the go-to tool for running multiple async operations concurrently in Python:

  • 3x-10x faster than sequential execution
  • Simple API - pass coroutines, get results in order
  • Error handling - use return_exceptions=True for resilience
  • Scalable - use semaphores to limit concurrency

Master asyncio.gather() and you've mastered concurrent Python programming.


Learn More

Share this article

Related Articles