Creating a Daily Bulk Ingestion Pipeline for Historical Price Data and Fundamentals
Automate your financial information in your database
In the finance field, we are usually trying to answer two related questions at the same time:
What did the market do?
What did the business do?
Prices move every trading day, reflecting new information and expectations. However, fundamentals update more slowly and in batches because public companies report on a cycle (e.g., U.S. issuers file Form 10-Q after the first three fiscal quarters and an annual 10-K). This becomes a pain point when we are doing valuation and screening reviews, as we need to pull the data at a specific time, but that time can become inconsistent.
This is why a daily ingestion pipeline exists. It gives us a consistent record that we reuse without re-downloading or questioning what we just pulled. Instead of relying on a live fetch each time, we can maintain a small local dataset that updates on schedule and is ready for further processing.
In this article, we will learn how to develop a daily bulk ingestion pipeline for historical price and fundamental data using source data from Financial Modelling Prep (FMP).
Curious about it? Let’s get into it.
Foundation
Before we move into the implementation details, it helps to treat this project as an ingestion layer built on top of an external data provider. Building this layer on top of the Financial Modeling Prep (FMP) API offers several practical benefits for financial analysis work.
First, it reduces duplication by reusing steps for requesting data, validating responses, standardising column names, and applying rules (e.g., date handling) for each symbol.
Second, it creates a single control point for the workflow, centralizing API key handling and daily logic rather than duplicating logic across scripts.
Third, it provides a stable historical record by maintaining a local dataset rather than recomputing results from live calls, thereby simplifying research and reporting.
Finally, it supports routine operation with two phases: an initial backfill to build historical coverage and a daily run to keep data current. Once scheduled, the dataset is automatically updated, ensuring a reliable workflow.
The Data Source
Let’s start building our daily ingestion pipeline by deciding which datasets we will pull from FMP. In this project, all data comes from FMP’s Stable API, which uses a single base URL and a consistent URL pattern:
https://financialmodelingprep.com/stable/In practice, FMP provides many endpoints, but this pipeline intentionally uses only a small subset. The goal is to identify the minimum datasets required to build a reliable store of historical prices and core financial statements, without introducing optional datasets that complicate maintenance.
For this pipeline, we rely on these endpoints:
Company search (
search-symbol): Lets you search by company name or partial ticker and returns candidates with symbols, names, exchanges, and currencies.Company profile (
profile): Returns the baseline company metadata you typically want to store alongside your price and fundamentals tables.Income statement (
income-statement): Provides revenue, net income, and other income statement fields over time.Balance sheet statement (
balance-sheet-statement): Provides assets, liabilities, and equity fields that help you understand the company’s financial position.Cash flow statement (
cash-flow-statement): Provides operating, investing, and financing cash flow fields, which are essential for evaluating cash generation and sustainability.Historical end-of-day prices (
historical-price-eod/full): Provides daily OHLCV and related fields for historical price storage.
These datasets are sufficient to build a clean ingestion pipeline that stores daily prices by date and financial statements by reporting period, while keeping the system simple and easy to run every day.
Project structure
This project is intentionally organised to separate the application, data storage, and entry points.
A simplified view of the project looks like this:
fmp_daily_ingestion/
├─ .github/
│ └─ workflows/
│ └─ daily_ingestion.yml
├─ app/
│ ├─ __init__.py
│ ├─ db.py
│ ├─ fmp_client.py
│ ├─ pipeline.py
│ └─ settings.py
├─ data/
│ ├─ fmp.sqlite3
│ └─ scheduler.log
├─ scripts/
│ ├─ __init__.py
│ ├─ backfill_symbols.py
│ ├─ backfill_prices.py
│ ├─ run_daily.py
│ ├─ scheduler.py
│ └─ check_db.py
├─ .env
└─ requirements.txtOnce we establish the project foundations, we will build our daily ingestion pipeline.
Step-by-Step Walkthrough
In this section, we will go through how our daily ingestion pipeline is built in each step.
Step 1: define dependencies and configuration
First, we set up the requirements.txtfile by keeping the dependencies minimal.
requests
python-dotenv
pandas
scheduleWe also define our .env file which will supply runtime configuration without hardcoding secrets or machine-specific paths into code.
FMP_API_KEY=YOUR_KEY
FMP_STABLE_BASE_URL=https://financialmodelingprep.com/stable
DB_PATH=data/fmp.sqlite3
FMP_WATCHLIST=AAPL,MSFT,TSLA
FUNDAMENTALS_PERIODS_TO_REFRESH=4
REQUEST_TIMEOUT=30
REQUEST_SLEEP=0.15FMP’s Stable API uses a single base URL and authentication through an API key passed as a query parameter.
Step 2: Establish a single configuration contract
Next, we will create a settings.pywhich would help every script and module read the configuration consistently. These settings will do the following:
load
.envvalidate required values (especially
FMP_API_KEY)provide defaults for optional settings
Our implementations will be looks like this:
# app/settings.py
import os
from dotenv import load_dotenv
# Load .env file explicitly
load_dotenv()
FMP_API_KEY = os.getenv(”FMP_API_KEY”)
if not FMP_API_KEY:
raise RuntimeError(”Missing FMP_API_KEY. Set it as an environment variable or in .env file.”)
# Use Stable for fundamentals, V3 for historical prices (free-friendly).
FMP_STABLE_BASE_URL = os.getenv(”FMP_STABLE_BASE_URL”, “https://financialmodelingprep.com/stable”).rstrip(”/”)
FMP_V3_BASE_URL = os.getenv(”FMP_V3_BASE_URL”, “https://financialmodelingprep.com/api/v3”).rstrip(”/”)
WATCHLIST = [s.strip().upper() for s in os.getenv(”FMP_WATCHLIST”, “AAPL,MSFT,TSLA”).split(”,”) if s.strip()]
DB_PATH = os.getenv(”DB_PATH”, “data/fmp.sqlite3”)
# Daily fundamentals: fetch last N rows and upsert (simple + idempotent).
FUNDAMENTALS_PERIODS_TO_REFRESH = int(os.getenv(”FUNDAMENTALS_PERIODS_TO_REFRESH”, “4”))
REQUEST_TIMEOUT = int(os.getenv(”REQUEST_TIMEOUT”, “30”))
REQUEST_SLEEP = float(os.getenv(”REQUEST_SLEEP”, “0.15”))This becomes the project’s control plane, as if you later run the project locally, in GitHub Actions, or under a scheduler, you do not change any application code, only environment values.
Step 3: Implement a Stable API client
In this section, we will build our client script in the fmp_client.py.The client should be the only script that knows how to:
build Stable URLs
attach
apikey=...enforce timeouts and basic pacing
raise clear errors when a request fails
The code we used will look like this:
from __future__ import annotations
import os
import time
from typing import Any, Dict, Optional
import requests
from urllib3.util import Retry
from requests.adapters import HTTPAdapter
from app.settings import FMP_API_KEY, FMP_STABLE_BASE_URL, REQUEST_TIMEOUT, REQUEST_SLEEP
class FMPClient:
“”“
Stable-only client (current docs):
Base URL: https://financialmodelingprep.com/stable/
Auth: apikey=<YOUR_KEY>
Stable quickstart confirms base URL + apikey query auth.
Historical EOD endpoint lives under Stable as well.
“”“
def __init__(
self,
api_key: Optional[str] = None,
stable_base_url: Optional[str] = None,
v3_base_url: Optional[str] = None,
timeout_s: Optional[int] = None,
sleep_s: Optional[float] = None,
session: Optional[requests.Session] = None,
) -> None:
self.api_key = (api_key or FMP_API_KEY or “”).strip()
if not self.api_key:
raise RuntimeError(”Missing FMP_API_KEY. Set it in .env or environment variables.”)
self.base_url = (stable_base_url or FMP_STABLE_BASE_URL or “https://financialmodelingprep.com/stable”).rstrip(”/”)
self.timeout_s = int(timeout_s if timeout_s is not None else REQUEST_TIMEOUT)
self.sleep_s = float(sleep_s if sleep_s is not None else REQUEST_SLEEP)
self.session = session or requests.Session()
if not session:
# Configure retries
retry_strategy = Retry(
total=5,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=[”GET”],
raise_on_status=True
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount(”https://”, adapter)
self.session.mount(”http://”, adapter)
def _get_json(self, endpoint: str, params: Optional[Dict[str, Any]] = None) -> Any:
params = dict(params or {})
params[”apikey”] = self.api_key
url = f”{self.base_url}/{endpoint.lstrip(’/’)}”
resp = self.session.get(url, params=params, timeout=self.timeout_s)
if resp.status_code == 402:
raise RuntimeError(f”FMP 402 (Restricted Endpoint) for {url}: {resp.text[:300]}”)
if not resp.ok:
raise RuntimeError(f”FMP error {resp.status_code} for {url}: {resp.text[:300]}”)
if self.sleep_s > 0:
time.sleep(self.sleep_s)
return resp.json()
# Symbols
def fetch_financial_statement_symbol_list(self) -> Any:
“”“/stable/financial-statement-symbol-list”“”
return self._get_json(”financial-statement-symbol-list”)
def fetch_profile(self, symbol: str) -> Any:
“”“/stable/profile?symbol=AAPL”“”
return self._get_json(”profile”, {”symbol”: symbol.upper()})
# Prices (Stable)
def fetch_historical_price_eod_full(
self,
symbol: str,
date_from: Optional[str] = None,
date_to: Optional[str] = None,
) -> Any:
“”“
Stable historical EOD (full):
/historical-price-eod/full?symbol=AAPL
“”“
params: Dict[str, Any] = {”symbol”: symbol.upper()}
if date_from:
params[”from”] = date_from
if date_to:
params[”to”] = date_to
return self._get_json(”historical-price-eod/full”, params)
# Fundamentals (Stable)
def fetch_income_statement(self, symbol: str) -> Any:
return self._get_json(”income-statement”, {”symbol”: symbol.upper()})
def fetch_balance_sheet(self, symbol: str) -> Any:
return self._get_json(”balance-sheet-statement”, {”symbol”: symbol.upper()})
def fetch_cash_flow(self, symbol: str) -> Any:
return self._get_json(”cash-flow-statement”, {”symbol”: symbol.upper()})These endpoints correspond directly to the Stable documentation for company profile, income statement, and historical EOD prices.
Step 4: define the schema and write for the data storage
In this section, we will define what we store and how we update it safely within the db.pyfile.
The code implementation will be as follows:
import sqlite3
import json
from datetime import datetime
from typing import Optional, Sequence, Tuple
DDL = “”“
CREATE TABLE IF NOT EXISTS symbols (
symbol TEXT PRIMARY KEY,
name TEXT,
exchange TEXT,
currency TEXT
);
CREATE TABLE IF NOT EXISTS prices_eod (
symbol TEXT NOT NULL,
date TEXT NOT NULL,
open REAL,
high REAL,
low REAL,
close REAL,
volume REAL,
PRIMARY KEY (symbol, date)
);
CREATE TABLE IF NOT EXISTS financials (
symbol TEXT NOT NULL,
period_end_date TEXT NOT NULL,
statement_type TEXT NOT NULL,
year INTEGER,
period TEXT,
payload_json TEXT NOT NULL,
PRIMARY KEY (symbol, period_end_date, statement_type)
);
“”“
def connect(db_path: str) -> sqlite3.Connection:
import os
os.makedirs(os.path.dirname(db_path), exist_ok=True)
conn = sqlite3.connect(db_path)
conn.execute(”PRAGMA journal_mode=WAL;”)
conn.execute(”PRAGMA synchronous=NORMAL;”)
return conn
def init_db(conn: sqlite3.Connection) -> None:
conn.executescript(DDL)
conn.commit()
def upsert_symbols(conn: sqlite3.Connection, rows: Sequence[Tuple]) -> None:
conn.executemany(
“”“
INSERT INTO symbols (symbol, name, exchange, currency)
VALUES (?, ?, ?, ?)
ON CONFLICT(symbol) DO UPDATE SET
name=excluded.name,
exchange=excluded.exchange,
currency=excluded.currency
“”“,
rows,
)
conn.commit()
def upsert_prices(conn: sqlite3.Connection, rows: Sequence[Tuple]) -> None:
conn.executemany(
“”“
INSERT INTO prices_eod (symbol, date, open, high, low, close, volume)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(symbol, date) DO UPDATE SET
open=excluded.open,
high=excluded.high,
low=excluded.low,
close=excluded.close,
volume=excluded.volume
“”“,
rows,
)
conn.commit()
def upsert_financials(conn: sqlite3.Connection, rows: Sequence[Tuple]) -> None:
conn.executemany(
“”“
INSERT INTO financials (symbol, period_end_date, statement_type, year, period, payload_json)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(symbol, period_end_date, statement_type) DO UPDATE SET
year=excluded.year,
period=excluded.period,
payload_json=excluded.payload_json
“”“,
rows,
)
conn.commit()
def read_symbols(conn: sqlite3.Connection, limit: Optional[int] = None) -> list[str]:
q = “SELECT symbol FROM symbols ORDER BY symbol”
if limit:
q += “ LIMIT ?”
cur = conn.execute(q, (limit,))
else:
cur = conn.execute(q)
return [r[0] for r in cur.fetchall()]The code above is designed as follows:
symbolsis the reference tableprices_eodstores daily OHLCV keyed by(symbol, date)financialsstores statement rows keyed by(symbol, period_end_date, statement_type)
The purpose of this layer is not only persistence but also operational reliability. With primary keys and upserts in place, we can rerun backfills and daily jobs without creating duplicates.
Step 5: Convert API responses into data rows
In this section, we will define the pipeline.pywhere this script defines the ingestion rules. The script should do the following:
normalize FMP response shapes
shape raw records into tuples that match table definitions
return those tuples so the DB layer can upsert them
The whole code implementation is as follows:
from __future__ import annotations
import json
import sqlite3
from typing import Any, Dict, Iterable, List, Optional, Tuple
from app.fmp_client import FMPClient
from app.db import upsert_symbols, upsert_prices, upsert_financials, read_symbols
def _as_list(payload: Any) -> List[Dict[str, Any]]:
“”“
Stable endpoints typically return a JSON array.
This helper makes the pipeline robust if the response is wrapped.
“”“
if isinstance(payload, list):
return [x for x in payload if isinstance(x, dict)]
if isinstance(payload, dict):
for key in (”data”, “results”, “historical”):
v = payload.get(key)
if isinstance(v, list):
return [x for x in v if isinstance(x, dict)]
return []
# 1) Symbols
def seed_symbols(conn: sqlite3.Connection, client: FMPClient, symbols: Optional[Iterable[str]] = None) -> int:
“”“
Seeds the symbols table. If symbols are provided, it enriches them via /profile.
If none provided, it could fetch a global list (but free tier usually restricts this).
Returns count of symbols processed.
“”“
if symbols:
rows: List[Tuple] = []
for s in symbols:
sym = s.strip().upper()
if not sym:
continue
prof = client.fetch_profile(sym)
p = _as_list(prof)
row = p[0] if p else {}
name = row.get(”companyName”) or row.get(”name”)
exchange = row.get(”exchange”) or row.get(”exchangeShortName”)
currency = row.get(”currency”)
rows.append((sym, name, exchange, currency))
if rows:
upsert_symbols(conn, rows)
return len(rows)
else:
# Fallback to fetching a list if possible (Stable API allows financial-statement-symbol-list)
payload = client.fetch_financial_statement_symbol_list()
items = _as_list(payload)
rows = []
for r in items:
sym = (r.get(”symbol”) or r.get(”ticker”) or “”).strip().upper()
if not sym:
continue
rows.append((
sym,
r.get(”name”) or r.get(”companyName”),
r.get(”exchange”) or r.get(”exchangeShortName”),
r.get(”currency”)
))
if rows:
upsert_symbols(conn, rows)
return len(rows)
# 2) Prices
def backfill_prices_for_symbol(
client: FMPClient,
symbol: str,
date_from: Optional[str] = None,
date_to: Optional[str] = None,
timeseries: Optional[int] = None, # Legacy, ignored or used as slice
) -> List[Tuple]:
“”“
Returns rows for upsert_prices:
(symbol, date, open, high, low, close, volume)
“”“
sym = symbol.strip().upper()
payload = client.fetch_historical_price_eod_full(sym, date_from=date_from, date_to=date_to)
bars = _as_list(payload)
if timeseries:
bars = bars[-int(timeseries):]
out: List[Tuple] = []
for b in bars:
dt = b.get(”date”) or b.get(”datetime”) or b.get(”time”)
if not dt:
continue
out.append((
sym,
str(dt),
b.get(”open”),
b.get(”high”),
b.get(”low”),
b.get(”close”),
b.get(”volume”)
))
return out
def ingest_prices_for_date(
conn: sqlite3.Connection,
client: FMPClient,
symbols: Iterable[str],
target_date: str
) -> int:
“”“
Daily run: Fetch exactly one day per symbol and upsert.
“”“
total = 0
for s in symbols:
rows = backfill_prices_for_symbol(client, s, date_from=target_date, date_to=target_date)
if rows:
upsert_prices(conn, rows)
total += len(rows)
return total
# 3) Fundamentals
def refresh_fundamentals(
conn: sqlite3.Connection,
client: FMPClient,
symbols: Iterable[str],
last_n: int = 4
) -> int:
“”“
Refreshes the latest N financial statements for a watchlist.
“”“
total = 0
for s in symbols:
sym = s.strip().upper()
bundles = [
(”income_statement”, client.fetch_income_statement(sym)),
(”balance_sheet”, client.fetch_balance_sheet(sym)),
(”cash_flow”, client.fetch_cash_flow(sym)),
]
rows_to_upsert = []
for statement_type, payload in bundles:
rows = _as_list(payload)
for r in rows[: int(last_n)]:
period_end = r.get(”date”)
if not period_end:
continue
year = r.get(”calendarYear”) or r.get(”year”)
period = r.get(”period”)
rows_to_upsert.append((
sym,
str(period_end),
statement_type,
year,
period,
json.dumps(r, ensure_ascii=False)
))
if rows_to_upsert:
upsert_financials(conn, rows_to_upsert)
total += len(rows_to_upsert)
return totalFrom there, the pipeline functions become our project lifecycle:
Symbols seeding enriches a watchlist using the profile endpoint and creates rows for
symbols. The profile endpoint is documented withsymbolas a required query parameter.Price backfill fetches historical EOD bars, maps each bar to
(symbol, date, open, high, low, close, volume), then returns rows to be upserted intoprices_eod.Daily ingestion uses the same shaping rules but narrows the request to a single target date (typically yesterday), ensuring the daily mode is not a separate system but a constrained version of the same ingestion path.
Fundamentals refresh fetches the latest statement rows and stores them under a composite key.
The central principle is consistency for all the data we acquired from the FMP API.
Step 6: Create runnable entry points
The scripts folder exists so we can run the pipeline without writing the code each time. Each script should follow the same pattern:
import settings
connect and initialise DB
instantiate
FMPClientcall pipeline functions
upsert results
print a concise summary
In this project, the scripts map directly to operational phases:
backfill_symbols.pyseeds yoursymbolstable fromWATCHLIST:
import sys
from pathlib import Path
# Add project root to sys.path
sys.path.append(str(Path(__file__).parent.parent))
from app.settings import (
DB_PATH, FMP_API_KEY, FMP_STABLE_BASE_URL, FMP_V3_BASE_URL, WATCHLIST
)
from app.db import connect, init_db
from app.fmp_client import FMPClient
from app.pipeline import seed_symbols
def main():
conn = connect(DB_PATH)
init_db(conn)
client = FMPClient(
api_key=FMP_API_KEY,
stable_base_url=FMP_STABLE_BASE_URL,
v3_base_url=FMP_V3_BASE_URL,
)
n = seed_symbols(conn, client, WATCHLIST)
print(f”Seeded {n} symbols into DB ({DB_PATH}) from WATCHLIST.”)
if __name__ == “__main__”:
main()backfill_prices.pyperforms historical loading forprices_eod
import sys
from pathlib import Path
# Add project root to sys.path
sys.path.append(str(Path(__file__).parent.parent))
import argparse
from app.settings import (
DB_PATH, FMP_API_KEY, FMP_STABLE_BASE_URL, FMP_V3_BASE_URL, WATCHLIST
)
from app.db import connect, init_db, read_symbols, upsert_prices
from app.fmp_client import FMPClient
from app.pipeline import backfill_prices_for_symbol
def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument(”--limit”, type=int, default=None, help=”Backfill only first N symbols from DB”)
ap.add_argument(”--symbols”, type=str, default=None, help=”Comma-separated tickers (overrides WATCHLIST)”)
# Optional: limit how much history you pull
ap.add_argument(”--from-date”, type=str, default=None, help=”YYYY-MM-DD”)
ap.add_argument(”--to-date”, type=str, default=None, help=”YYYY-MM-DD”)
ap.add_argument(”--timeseries”, type=int, default=None, help=”Return last N days”)
args = ap.parse_args()
conn = connect(DB_PATH)
init_db(conn)
if args.symbols:
symbols = [s.strip().upper() for s in args.symbols.split(”,”) if s.strip()]
else:
# Defaults to watchlist if DB is empty or use current symbols
db_syms = read_symbols(conn, limit=args.limit)
symbols = db_syms if db_syms else WATCHLIST
client = FMPClient(
api_key=FMP_API_KEY,
stable_base_url=FMP_STABLE_BASE_URL,
v3_base_url=FMP_V3_BASE_URL,
)
total_rows = 0
for i, sym in enumerate(symbols, 1):
rows = backfill_prices_for_symbol(
client,
sym,
date_from=args.from_date,
date_to=args.to_date,
timeseries=args.timeseries,
)
if rows:
upsert_prices(conn, rows)
total_rows += len(rows)
if i % 25 == 0:
print(f”Processed {i}/{len(symbols)} symbols...”)
print(f”Done. Upserted {total_rows} price rows.”)
if __name__ == “__main__”:
main()run_daily.pyruns the daily refresh (yesterday’s prices + latest fundamentals)
import sys
from pathlib import Path
# Add project root to sys.path
sys.path.append(str(Path(__file__).parent.parent))
import datetime as dt
from app.settings import (
DB_PATH, FMP_API_KEY, FMP_STABLE_BASE_URL, FMP_V3_BASE_URL, WATCHLIST, FUNDAMENTALS_PERIODS_TO_REFRESH
)
from app.db import connect, init_db
from app.fmp_client import FMPClient
from app.pipeline import ingest_prices_for_date, refresh_fundamentals
def main():
# Defensive check: today - 1 day
target_date = (dt.date.today() - dt.timedelta(days=1)).isoformat()
conn = connect(DB_PATH)
init_db(conn)
client = FMPClient(
api_key=FMP_API_KEY,
stable_base_url=FMP_STABLE_BASE_URL,
v3_base_url=FMP_V3_BASE_URL,
)
n_prices = ingest_prices_for_date(conn, client, WATCHLIST, target_date)
n_fin = refresh_fundamentals(conn, client, WATCHLIST, last_n=FUNDAMENTALS_PERIODS_TO_REFRESH)
print(f”[{target_date}] upserted {n_prices} price rows and {n_fin} fundamentals rows.”)
if __name__ == “__main__”:
main()scheduler.pyrunsrun_daily.pyon a local schedule and logs output
import sys
from pathlib import Path
# Add project root to sys.path
sys.path.append(str(Path(__file__).parent.parent))
import time
import schedule
import subprocess
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format=’%(asctime)s - %(levelname)s - %(message)s’,
handlers=[
logging.FileHandler(”data/scheduler.log”),
logging.StreamHandler()
]
)
def run_job():
logging.info(”Starting daily ingestion job...”)
try:
# Run run_daily.py as a subprocess
result = subprocess.run(
[sys.executable, “scripts/run_daily.py”],
capture_output=True,
text=True,
check=True
)
logging.info(f”Job completed successfully:\n{result.stdout}”)
except subprocess.CalledProcessError as e:
logging.error(f”Job failed with error:\n{e.stderr}”)
except Exception as e:
logging.error(f”An unexpected error occurred: {e}”)
def main():
# Schedule the job for 01:00 AM every day
# You can change this time as needed
schedule.every().day.at(”01:00”).do(run_job)
logging.info(”Scheduler started. Ingestion job scheduled for 01:00 AM daily.”)
logging.info(”Press Ctrl+C to exit.”)
try:
while True:
schedule.run_pending()
time.sleep(60) # Check every minute
except KeyboardInterrupt:
logging.info(”Scheduler stopped by user.”)
if __name__ == “__main__”:
main()check_db.pyverifies table counts, date ranges, and recent rows
import sys
from pathlib import Path
# Add project root to sys.path
sys.path.append(str(Path(__file__).parent.parent))
import sqlite3
import pandas as pd
from app.settings import DB_PATH
def main():
print(f”Checking database at: {DB_PATH}”)
con = sqlite3.connect(DB_PATH)
try:
print(”\n--- Row Counts ---”)
print(pd.read_sql(”SELECT COUNT(*) AS n FROM symbols”, con))
print(pd.read_sql(”SELECT COUNT(*) AS n FROM prices_eod”, con))
print(pd.read_sql(”SELECT COUNT(*) AS n FROM financials”, con))
print(”\n--- Price Statistics ---”)
print(pd.read_sql(”SELECT MIN(date) AS min_date, MAX(date) AS max_date FROM prices_eod”, con))
print(”\n--- Recent Prices (Last 5) ---”)
print(pd.read_sql(”SELECT * FROM prices_eod ORDER BY date DESC LIMIT 5”, con))
print(”\n--- Fundamentals Breakdown ---”)
print(pd.read_sql(”SELECT statement_type, COUNT(*) AS n FROM financials GROUP BY statement_type”, con))
except Exception as e:
print(f”Error checking DB: {e}”)
finally:
con.close()
if __name__ == “__main__”:
main()This separation keeps the project maintainable and we are able to improve the pipeline in the future.
Step 7: The database generation
The data/ folder will contain the generated state:
fmp.sqlite3(Our SQLite database)scheduler.log(Our local scheduler audit trail, if you use it)
Nothing in data/ should be required for understanding the code. It is the product of running the pipeline.
Step 8: Scheduling (local or GitHub Actions)
We have two scheduling modes, which run locally or using GitHub Actions.
Local scheduling (
scripts/scheduler.py) triggers the daily job at a fixed time and writes logs todata/scheduler.log. It is the simplest option when you control the machine.GitHub Actions scheduling (
.github/workflows/daily_ingestion.yml) runs the same daily script on a cron schedule and stores the SQLite database as a workflow artifact. GitHub’s scheduled workflows are driven by cron syntax and operate in UTC. We can use the YAML file:
name: Daily Data Ingestion
on:
schedule:
# Runs at 02:00 UTC every day
- cron: ‘0 2 * * *’
workflow_dispatch:
# Allows manual triggering
jobs:
ingest:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: ‘3.10’
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Run daily ingestion
env:
FMP_API_KEY: ${{ secrets.FMP_API_KEY }}
FMP_WATCHLIST: ${{ vars.FMP_WATCHLIST }}
DB_PATH: data/fmp.sqlite3
run: |
python scripts/run_daily.py
- name: Upload database
uses: actions/upload-artifact@v3
with:
name: fmp-database
path: data/fmp.sqlite3The important part is that both modes execute the same run_daily.py entry point and therefore share the same ingestion behaviour.
That is all for the project structure that we built for the daily ingestion pipeline. In the next section, we will go through how to run them step-by-step.
Running the scripts
All operational entry points is exist in the scriptsfolder. Each script adds the project root to sys.path, so the recommended way to execute them is from the repository root using python scripts/<name>.py.
1. Install dependencies
From the project root:
pip install -r requirements.txtThis installs the minimal runtime stack (requests, python-dotenv, pandas, schedule).
2. Configure .env
Before running anything, ensure .env defines at least:
FMP_API_KEY(Acquired the key from the FMP site)FMP_WATCHLIST(comma-separated tickers)DB_PATH(for exampledata/fmp.sqlite3)
Our scripts read these values through app/settings.py and use them consistently across the pipeline.
3. Seed symbols into the database
Run the following script:
python scripts/backfill_symbols.pyThis script connects to the SQLite database at DB_PATH, initializes the schema, instantiates FMPClient, and seeds the symbols table using your WATCHLIST.
When it completes, it prints a confirmation of the number of symbols seeded and the database file used. Something like:
Done. Upserted 3768 price rows.4. Backfill historical prices
Run the following script:
python scripts/backfill_prices.pyThis script is the one-time historical loader for prices_eod. It also initializes the database schema before writing. The example result is shown below:
Seeded 3 symbols into DB (data/fmp.sqlite3) from WATCHLIST.Symbol selection follows the rule: if you provide --symbols, it uses that list; otherwise, it reads from the database and falls back to WATCHLIST if the database is empty.
You can keep the backfill controlled during testing or writing by using the optional arguments defined in the script:
# Backfill only specific tickers
python scripts/backfill_prices.py --symbols AAPL,MSFT
# Backfill only first N symbols read from the DB
python scripts/backfill_prices.py --limit 10
# Limit history by date range
python scripts/backfill_prices.py --symbols AAPL --from-date 2024-01-01 --to-date 2024-12-31
# Limit history by “last N days” returned
python scripts/backfill_prices.py --symbols AAPL --timeseries 200These flags correspond directly to the script’s argument parser (--limit, --symbols, --from-date, --to-date, --timeseries).
During execution, it prints progress every 25 symbols and ends with the total number of upserted price rows.
5. Run the daily ingestion job
Run the following script:
python scripts/run_daily.pyThis is the daily operational entry point, where it computes thetarget_date as today minus one day, then performs two actions, which are price ingestion for that date and refreshes fundamentals for the watchlist. The fundamentals refresh window is controlled by FUNDAMENTALS_PERIODS_TO_REFRESH.
For example, the result is as following:
[2026-02-13] upserted 3 price rows and 36 fundamentals rows.6. Verify what was stored in SQLite
Run the following script:
python scripts/check_db.pyThis script is your verification tool. It prints row counts for symbols, prices_eod, and financials, shows min/max dates in prices_eod, prints the last five price rows, and summarizes fundamentals by statement_type.
The example result is as following:
Checking database at: data/fmp.sqlite3
--- Row Counts ---
n
0 3
n
0 3777
n
0 36
--- Price Statistics ---
min_date max_date
0 2021-02-10 2026-02-13
--- Recent Prices (Last 5) ---
symbol date open high low close volume
0 AAPL 2026-02-13 262.01 262.23 255.45 255.78 54927132.0
1 MSFT 2026-02-13 404.45 405.54 398.05 401.32 33949805.0
2 TSLA 2026-02-13 414.31 424.06 410.88 417.44 50565054.0
3 AAPL 2026-02-12 275.59 275.72 260.18 261.73 81077229.0
4 MSFT 2026-02-12 405.00 406.20 398.01 401.84 40802400.0
--- Fundamentals Breakdown ---
statement_type n
0 balance_sheet 12
1 cash_flow 12
2 income_statement 12This script is used for a quick check after backfills or the daily job.
7. Automate the daily run
First, let’s take a look at the local scheduler, which runs on our machine:
Run the following script:
python scripts/scheduler.pyThis schedules the job daily at 01:00 AM and runs scripts/run_daily.py as a subprocess, writing logs to data/scheduler.log and to stdout.
GitHub Actions (hosted schedule)
The workflow runs at 02:00 UTC daily, sets FMP_API_KEY, FMP_WATCHLIST, and DB_PATH=data/fmp.sqlite3, then executes python scripts/run_daily.py and uploads the SQLite file as an artifact. This script runs only when we push it to the GitHub repository.
That’s all you need to understand how to build the daily ingestion pipeline with FMP.
Conclusion
In this article, we have learned how to build a small but reliable daily ingestion workflow that keeps two core financial datasets current: end-of-day prices and company fundamentals.
By relying on Financial Modeling Prep’s Stable API as the single upstream source, the pipeline remains consistent in how it authenticates, requests data, and standardizes responses, while remaining practical for routine use in research, screening, and internal analytics.
I hope it has helped!


