#!/usr/bin/env python3 """ Fetch actual data for transport indicators from the World Bank Data360 API. Can pull data for specific indicators, countries, and time ranges. Usage: python3 fetch-data.py # all core transport indicators python3 fetch-data.py --indicator WB_WDI_IS_RRS_PASG_KM python3 fetch-data.py --database UNCTAD_MT python3 fetch-data.py --country GBR,KEN,IND """ import argparse import json import requests import sys import csv from pathlib import Path BASE = "https://data360api.worldbank.org/data360" RESULTS_DIR = Path(__file__).parent.parent / "results" DATA_DIR = Path(__file__).parent.parent / "data-pulls" DATA_DIR.mkdir(exist_ok=True) # Core transport indicators - the most directly relevant ones CORE_TRANSPORT_INDICATORS = { # Air transport "WB_WDI_IS_AIR_GOOD_MT_K1": {"db": "WB_WDI", "name": "Air transport, freight (million ton-km)"}, "WB_WDI_IS_AIR_PSGR": {"db": "WB_WDI", "name": "Air transport, passengers carried"}, "WB_WDI_IS_AIR_DPRT": {"db": "WB_WDI", "name": "Air transport, registered carrier departures"}, # Railways "WB_WDI_IS_RRS_PASG_KM": {"db": "WB_WDI", "name": "Railways, passengers carried (million passenger-km)"}, "WB_WDI_IS_RRS_GOOD_MT_K6": {"db": "WB_WDI", "name": "Railways, goods transported (million ton-km)"}, # Road safety "WB_WDI_SH_STA_TRAF_P5": {"db": "WB_WDI", "name": "Mortality caused by road traffic injury (per 100,000)"}, # Maritime / shipping "UNCTAD_MT_PORT": {"db": "UNCTAD_MT", "name": "Number of Port calls (All ships, annual)"}, "UNCTAD_MT_PORT_TIME": {"db": "UNCTAD_MT", "name": "Median time in port (days)"}, "UNCTAD_MT_CONTAINER_THROUGHPUT": {"db": "UNCTAD_MT", "name": "Container port throughput (TEU)"}, "UNCTAD_LSC_INDEX": {"db": "UNCTAD_LSC", "name": "Liner shipping connectivity index"}, # Logistics "WB_LPI_LP_LPI_OVRL_XQ": {"db": "WB_LPI", "name": "Logistics performance index: Overall"}, "WB_LPI_LP_LPI_INFR_XQ": {"db": "WB_LPI", "name": "Transport infrastructure quality"}, # Infrastructure quality "WEF_GCI_GCI4_A_02_01": {"db": "WEF_GCI", "name": "GCI 4.0: Transport infrastructure"}, # Climate / emissions "WRI_CLIMATEWATCH_ALL_GHG_TRANSPORT": {"db": "WRI_CLIMATEWATCH", "name": "GHG emissions - transport sector"}, # Rural access "UN_SDG_SP_ROD_R2KM": {"db": "UN_SDG", "name": "Rural population within 2km of all-season road (%)"}, # Private investment "WB_PPI_TRN_INV": {"db": "WB_PPI", "name": "Private sector participation in transport (USD)"}, } def fetch_data(database_id, indicator=None, country=None, time_from=None, time_to=None, max_records=10000): """Fetch data from Data360 API with pagination.""" all_records = [] skip = 0 batch = 1000 params = {"DATABASE_ID": database_id, "skip": skip} if indicator: params["INDICATOR"] = indicator if country: params["REF_AREA"] = country if time_from: params["timePeriodFrom"] = time_from if time_to: params["timePeriodTo"] = time_to while skip < max_records: params["skip"] = skip resp = requests.get(f"{BASE}/data", params=params) resp.raise_for_status() data = resp.json() records = data.get("value", []) if not records: break all_records.extend(records) total = data.get("count", 0) skip += batch if skip >= total: break return all_records, data.get("count", len(all_records)) def main(): parser = argparse.ArgumentParser(description="Fetch transport data from World Bank Data360") parser.add_argument("--indicator", "-i", help="Specific indicator ID") parser.add_argument("--database", "-d", help="Specific database ID") parser.add_argument("--country", "-c", help="Country codes (comma-separated, e.g. GBR,KEN)") parser.add_argument("--from-year", default="2000", help="Start year (default: 2000)") parser.add_argument("--to-year", default="2024", help="End year (default: 2024)") parser.add_argument("--list", action="store_true", help="List core transport indicators") args = parser.parse_args() if args.list: print("Core Transport Indicators:") for idno, info in sorted(CORE_TRANSPORT_INDICATORS.items()): print(f" {idno} [{info['db']}]: {info['name']}") return if args.indicator: # Fetch a specific indicator if args.indicator in CORE_TRANSPORT_INDICATORS: db = CORE_TRANSPORT_INDICATORS[args.indicator]["db"] elif args.database: db = args.database else: print(f"Error: specify --database for non-core indicator {args.indicator}") sys.exit(1) indicators_to_fetch = [(args.indicator, db)] elif args.database: # All transport indicators from a specific database with open(RESULTS_DIR / "transport-indicators-filtered.json") as f: all_inds = json.load(f) indicators_to_fetch = [ (ind["idno"], ind["database_id"]) for ind in all_inds if ind.get("database_id") == args.database and ind.get("idno") ] else: # All core indicators indicators_to_fetch = [(idno, info["db"]) for idno, info in CORE_TRANSPORT_INDICATORS.items()] print(f"Fetching {len(indicators_to_fetch)} indicators...") all_data = [] for idno, db_id in indicators_to_fetch: print(f"\n Fetching: {idno} from {db_id}") try: records, total = fetch_data( database_id=db_id, indicator=idno, country=args.country, time_from=args.from_year, time_to=args.to_year, ) print(f" Records: {len(records)} / {total}") all_data.extend(records) except Exception as e: print(f" Error: {e}") # Save as JSON outfile = DATA_DIR / "transport-data.json" with open(outfile, "w") as f: json.dump(all_data, f, indent=2) print(f"\nSaved {len(all_data)} records to {outfile}") # Also save as CSV for easy analysis if all_data: csv_file = DATA_DIR / "transport-data.csv" fields = ["INDICATOR", "REF_AREA", "TIME_PERIOD", "OBS_VALUE", "DATABASE_ID", "UNIT_MEASURE", "FREQ", "OBS_STATUS"] with open(csv_file, "w", newline="") as f: writer = csv.DictWriter(f, fieldnames=fields, extrasaction="ignore") writer.writeheader() writer.writerows(all_data) print(f"Saved CSV to {csv_file}") if __name__ == "__main__": main()