Connection Pool

AsyncBlockpartyPool and SyncBlockpartyPool are convenience wrappers that create and cache clients per chain. They use a ProviderSet under the hood, so all fallback and rate limiting works automatically.

from blockparty import AsyncBlockpartyPool, ProviderCredential, EtherscanTier

async with AsyncBlockpartyPool(
    credentials=[
        ProviderCredential(type="etherscan", api_key="KEY", tier=EtherscanTier.STANDARD),
        ProviderCredential(type="routescan"),
        ProviderCredential(type="blockscout"),
    ],
) as pool:
    base_txs = await pool.get_internal_transactions(chain_id=8453, address="0x...", limit=10)
    arb_txs = await pool.get_internal_transactions(chain_id=42161, address="0x...", limit=10)

    # URL for preferred explorer
    print(pool.urls(8453).tx("0xabc123..."))

    # URL for the explorer that actually answered
    print(pool.urls_for(8453, base_txs.provider).tx("0xabc123..."))

You can also pass an existing ProviderSet:

pool = AsyncBlockpartyPool(providers=my_provider_set)

Sync pool

from blockparty import SyncBlockpartyPool, ProviderCredential

with SyncBlockpartyPool(
    credentials=[
        ProviderCredential(type="routescan"),
        ProviderCredential(type="blockscout"),
    ],
) as pool:
    resp = pool.get_internal_transactions(chain_id=8453, address="0x...", limit=10)