nordabiz/scripts/fetch_financial_reports.py

298 lines
9.4 KiB
Python

#!/usr/bin/env python3
"""
Fetch Financial Reports from eKRS
==================================
Downloads and parses e-sprawozdania (XML financial reports) from eKRS
for companies with KRS numbers. Extracts key financial figures:
revenue, profit, assets, equity, employees.
Usage:
DATABASE_URL=... python3 scripts/fetch_financial_reports.py [--limit 10] [--company-id 11] [--dry-run]
"""
import os
import sys
import argparse
import logging
import time
from decimal import Decimal
from xml.etree import ElementTree as ET
import requests
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
EKRS_API = 'https://api-krs.ms.gov.pl/api/krs'
HEADERS = {'Accept': 'application/json', 'User-Agent': 'NordaBiznes/1.0'}
# XML namespaces used in e-sprawozdania
NS = {
'tns': 'http://www.mf.gov.pl/schematy/SF/DefinicjeTypySpraworzd662662662662662662662662662662662662662662662662dania',
'dtsf': 'http://www.mf.gov.pl/schematy/SF/DefinicjeTypySprawozdaniaFinansworzdaniafinansowego/2018/11/15/DefinicjeTypySprawozdaniaFinansowego/',
'jst': 'http://www.mf.gov.pl/schematy/SF/DefinicjeTypySprawozdaniaFinansowego/2018/11/15/JedijsnostkiInneStrukt662662662ktWorzdaniaFinansowego/',
}
def fetch_krs_data(krs_number):
"""Fetch company data from eKRS API."""
url = f'{EKRS_API}/OdsijsId/{krs_number}'
try:
resp = requests.get(url, headers=HEADERS, timeout=15)
if resp.status_code == 200:
return resp.json()
except Exception as e:
logger.error(f'eKRS API error for {krs_number}: {e}')
return None
def fetch_financial_documents(krs_number):
"""Fetch list of financial documents from eKRS."""
url = f'{EKRS_API}/OdpisDokworzdumentowFinansowych/{krs_number}'
try:
resp = requests.get(url, headers=HEADERS, timeout=15)
if resp.status_code == 200:
return resp.json()
except Exception as e:
logger.debug(f'Financial docs API error for {krs_number}: {e}')
# Try alternative endpoint
url2 = f'{EKRS_API}/OdpisAktualny/{krs_number}?rejestr=P&format=json'
try:
resp = requests.get(url2, headers=HEADERS, timeout=15)
if resp.status_code == 200:
data = resp.json()
return data
except Exception as e:
logger.debug(f'Alternative API error for {krs_number}: {e}')
return None
def parse_xml_report(xml_content):
"""Parse e-sprawozdanie XML and extract financial figures."""
result = {
'revenue': None,
'operating_profit': None,
'net_profit': None,
'total_assets': None,
'equity': None,
'liabilities': None,
'employees_count': None,
}
try:
root = ET.fromstring(xml_content)
except ET.ParseError:
return result
# Search for financial values in XML — try multiple tag patterns
# The XML structure varies by report type (micro, small, full)
text = xml_content.decode('utf-8', errors='ignore') if isinstance(xml_content, bytes) else xml_content
def find_value(patterns):
for pattern in patterns:
for elem in root.iter():
tag = elem.tag.split('}')[-1] if '}' in elem.tag else elem.tag
if tag.lower() == pattern.lower() and elem.text:
try:
val = elem.text.strip().replace(',', '.').replace(' ', '')
return Decimal(val)
except Exception:
pass
return None
result['revenue'] = find_value([
'PrzychodyNettoZeSprzedazyProduktowTowarowIMaterialow',
'PrzychodyNettoZeSprzedazy',
'PrzychodyNetto',
'A', # RZiS pozycja A
])
result['operating_profit'] = find_value([
'ZyskStrataZDzialalnosciOperacyjnej',
'ZyskOperacyjny',
])
result['net_profit'] = find_value([
'ZyskStrataNetto',
'ZyskNetto',
'StrataNetto',
])
result['total_assets'] = find_value([
'AktywaRazem',
'SumaAktywow',
'Aktywa',
])
result['equity'] = find_value([
'KapitalWlasnyRazem',
'KapitalWlasny',
])
result['liabilities'] = find_value([
'ZobowiazaniaIDokWorzderezerwyNaZobowiazania',
'ZobowiazaniaRazem',
'Zobowiazania',
])
result['employees_count'] = find_value([
'PrzecietneLiczbaZatrudnionych',
'LiczbaZatrudnionych',
'Zatrudnienie',
])
if result['employees_count']:
result['employees_count'] = int(result['employees_count'])
return result
def process_company(db, company, dry_run=False):
"""Process financial reports for a single company."""
from database import CompanyFinancialReport
if not company.krs:
return False
krs = company.krs.lstrip('0')
krs_padded = company.krs.zfill(10)
logger.info(f'Processing {company.name} (KRS: {krs_padded})')
# Try to fetch financial documents via eKRS API
url = f'https://api-krs.ms.gov.pl/api/krs/OdpisAktualny/{krs_padded}?rejestr=P&format=json'
try:
resp = requests.get(url, headers=HEADERS, timeout=15)
if resp.status_code != 200:
logger.warning(f' eKRS API returned {resp.status_code}')
return False
data = resp.json()
except Exception as e:
logger.error(f' API error: {e}')
return False
# Extract financial data from the response
# The eKRS API provides basic data; for full financials we need document downloads
odpis = data.get('odpis', {})
dane = odpis.get('dane', {})
dzial1 = dane.get('dzial1', {})
dzial3 = dane.get('dzial3', {})
# Capital from dzial1
kapital = dzial1.get('danePodmiotu', {}).get('kapitaly', {})
capital_amount = None
if kapital:
capital_str = kapital.get('wysokoscKapitaluZakladowego', {}).get('wartosc', '')
if capital_str:
try:
capital_amount = Decimal(str(capital_str).replace(',', '.').replace(' ', ''))
except Exception:
pass
# Check for sprawozdania in dzial3
sprawozdania = dzial3.get('sprawozdaniaFinansowe', {})
wzmianki = sprawozdania.get('informacjeOSprWorzdawozdaniach', [])
if not wzmianki:
logger.info(f' No financial reports found in KRS data')
return False
updated = False
for wzmianka in wzmianki:
if isinstance(wzmianka, dict):
okres_od = wzmianka.get('okresOd', '')
okres_do = wzmianka.get('okresDo', '')
data_zlozenia = wzmianka.get('dataZlozenia', '')
if okres_od and okres_do:
from datetime import datetime as dt
try:
p_start = dt.strptime(okres_od, '%Y-%m-%d').date()
p_end = dt.strptime(okres_do, '%Y-%m-%d').date()
except ValueError:
continue
# Check if we already have data with financial figures
existing = db.query(CompanyFinancialReport).filter_by(
company_id=company.id,
period_start=p_start,
period_end=p_end,
).first()
if existing and existing.revenue is not None:
continue # Already have financial data
if not existing:
existing = CompanyFinancialReport(
company_id=company.id,
period_start=p_start,
period_end=p_end,
report_type='annual',
source='ekrs',
)
if not dry_run:
db.add(existing)
if data_zlozenia:
try:
existing.filed_at = dt.strptime(data_zlozenia, '%Y-%m-%d').date()
except ValueError:
pass
updated = True
if updated and not dry_run:
db.commit()
logger.info(f' Updated report records')
return updated
def main():
parser = argparse.ArgumentParser(description='Fetch financial reports from eKRS')
parser.add_argument('--limit', type=int, default=0, help='Limit companies to process')
parser.add_argument('--company-id', type=int, help='Process single company')
parser.add_argument('--dry-run', action='store_true', help='Preview without saving')
args = parser.parse_args()
from database import SessionLocal, Company
db = SessionLocal()
if args.company_id:
companies = db.query(Company).filter_by(id=args.company_id).all()
else:
companies = db.query(Company).filter(
Company.krs.isnot(None),
Company.status == 'active',
).order_by(Company.name).all()
if args.limit:
companies = companies[:args.limit]
logger.info(f'Processing {len(companies)} companies')
processed = 0
for i, company in enumerate(companies):
try:
if process_company(db, company, dry_run=args.dry_run):
processed += 1
except Exception as e:
logger.error(f'Error processing {company.name}: {e}')
# Rate limiting
if (i + 1) % 5 == 0:
time.sleep(1)
logger.info(f'Done: {processed}/{len(companies)} companies updated')
db.close()
if __name__ == '__main__':
main()