nordabiz/services/website_discovery_service.py
Maciej Pienczyn 11184c5a58
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
feat: scrape subpages (kontakt, o-nas) for NIP/REGON verification
Root page often lacks NIP/REGON. Now scrapes /kontakt/, /contact,
/o-nas, /o-firmie to find strong verification signals. Stops early
when NIP/REGON/KRS found.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-21 09:23:50 +01:00

600 lines
22 KiB
Python

"""
Website Discovery Service
==========================
Discovers websites for companies that don't have one registered.
Uses Brave Web Search API to find candidates, scrapes them for verification data,
and compares extracted information against known company data.
"""
import os
import re
import time
import logging
from urllib.parse import urlparse
from datetime import datetime
import requests
from bs4 import BeautifulSoup
from database import SessionLocal, Company, WebsiteDiscoveryCandidate
from utils.data_quality import update_company_data_quality
logger = logging.getLogger(__name__)
# Domains to skip - business directories, social media, own portal
DIRECTORY_DOMAINS = {
# Business directories & registries
'panoramafirm.pl', 'aleo.com', 'rejestr.io', 'krs-pobierz.pl',
'gowork.pl', 'oferteo.pl', 'pkt.pl', 'firmy.net', 'zumi.pl',
'baza-firm.com.pl', 'e-krs.pl', 'krs-online.com.pl', 'regon.info',
'infoveriti.pl', 'companywall.pl', 'findcompany.pl', 'owg.pl',
'imsig.pl', 'monitorfirm.pb.pl', 'mojepanstwo.pl', 'biznes-polska.pl',
'zwiazekpracodawcow.pl', 'notariuszepl.top', 'wypr.pl', 'mapcarta.com',
'analizy.pl', 'transfermarkt.pl', 'mojewejherowo.pl', 'orlyjubilerstwa.pl',
'norda-biznes.info', 'bizraport.pl', 'aplikuj.pl', 'lexspace.pl',
'drewnianeabc.pl', 'f-trust.pl', 'itspace.llc',
'biznesfinder.pl', 'egospodarka.pl', 'bazatel.pl',
'wspanialewesele.com.pl', 'wyszukiwarkakrs.pl', 'funduszowe.pl',
'itspace.company',
# Social media
'facebook.com', 'linkedin.com', 'youtube.com', 'instagram.com',
'twitter.com', 'x.com', 'tiktok.com',
# Own portal & major sites
'nordabiznes.pl', 'google.com', 'google.pl',
'wikipedia.org', 'olx.pl', 'allegro.pl',
}
# --- Extraction helpers ---
def _normalize_nip(nip):
return re.sub(r'[^0-9]', '', nip)
def _validate_nip(nip):
nip = _normalize_nip(nip)
if len(nip) != 10:
return False
weights = [6, 5, 7, 2, 3, 4, 5, 6, 7]
try:
checksum = sum(int(nip[i]) * weights[i] for i in range(9)) % 11
return checksum == int(nip[9])
except (ValueError, IndexError):
return False
def _validate_regon(regon):
regon = re.sub(r'[^0-9]', '', regon)
if len(regon) == 9:
weights = [8, 9, 2, 3, 4, 5, 6, 7]
checksum = sum(int(regon[i]) * weights[i] for i in range(8)) % 11
if checksum == 10:
checksum = 0
return checksum == int(regon[8])
elif len(regon) == 14:
weights9 = [8, 9, 2, 3, 4, 5, 6, 7]
checksum9 = sum(int(regon[i]) * weights9[i] for i in range(8)) % 11
if checksum9 == 10:
checksum9 = 0
if checksum9 != int(regon[8]):
return False
weights14 = [2, 4, 8, 5, 0, 9, 7, 3, 6, 1, 2, 4, 8]
checksum14 = sum(int(regon[i]) * weights14[i] for i in range(13)) % 11
if checksum14 == 10:
checksum14 = 0
return checksum14 == int(regon[13])
return False
def _find_nips_in_text(text):
patterns = [
r'NIP[:\s]*(\d{3}[-\s]?\d{3}[-\s]?\d{2}[-\s]?\d{2})',
r'NIP[:\s]*(\d{10})',
r'numer\s+identyfikacji\s+podatkowej[:\s]*(\d{10})',
]
nips = []
for pattern in patterns:
matches = re.findall(pattern, text, re.IGNORECASE)
for match in matches:
nip = _normalize_nip(match)
if _validate_nip(nip) and nip not in nips:
nips.append(nip)
return nips
def _find_regons_in_text(text):
patterns = [
r'REGON[:\s]*(\d{9,14})',
r'rejestr\s+gospodarczy[:\s]*(\d{9,14})',
]
regons = []
for pattern in patterns:
matches = re.findall(pattern, text, re.IGNORECASE)
for match in matches:
regon = re.sub(r'[^0-9]', '', match)
if _validate_regon(regon) and regon not in regons:
regons.append(regon)
return regons
def _find_krs_in_text(text):
patterns = [
r'KRS[:\s]*(\d{10})',
r'Krajow\w+\s+Rejestr\w*\s+S[aą]dow\w*[:\s]*(\d{10})',
]
krs_numbers = []
for pattern in patterns:
matches = re.findall(pattern, text, re.IGNORECASE)
for match in matches:
krs = re.sub(r'[^0-9]', '', match)
if len(krs) == 10 and krs not in krs_numbers:
krs_numbers.append(krs)
return krs_numbers
def _extract_emails(text):
email_pattern = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'
emails = re.findall(email_pattern, text)
skip_domains = {
'example.com', 'sentry.io', 'wixpress.com', 'wordpress.org',
'w3.org', 'schema.org', 'googleapis.com',
}
return list(dict.fromkeys(
e.lower() for e in emails
if not any(d in e.lower() for d in skip_domains)
))[:5]
def _extract_phones(text):
phone_patterns = [
r'(?:\+48\s?)?\d{2}[\s-]?\d{3}[\s-]?\d{2}[\s-]?\d{2}',
r'(?:\+48\s?)?\d{3}[\s-]?\d{3}[\s-]?\d{3}',
r'\(\d{2}\)\s?\d{3}[\s-]?\d{2}[\s-]?\d{2}',
r'(?:tel|phone|telefon)[.:]\s*[\+]?\d[\d\s\-]{7,14}',
]
phones = []
for pattern in phone_patterns:
matches = re.findall(pattern, text, re.IGNORECASE)
for m in matches:
clean = re.sub(r'(?:tel|phone|telefon)[.:]?\s*', '', m, flags=re.IGNORECASE).strip()
digits = re.sub(r'\D', '', clean)
if 9 <= len(digits) <= 12:
phones.append(clean)
return list(dict.fromkeys(phones))[:5]
def _normalize_url_to_root(url):
"""Strip path from URL, keep only scheme + domain (root page)."""
try:
parsed = urlparse(url)
scheme = parsed.scheme or 'https'
netloc = parsed.netloc
if not netloc:
return url
return f'{scheme}://{netloc}/'
except Exception:
return url
def _is_directory_domain(url):
"""Check if URL belongs to a known business directory."""
try:
domain = urlparse(url).netloc.lower()
# Strip www.
if domain.startswith('www.'):
domain = domain[4:]
return any(domain == d or domain.endswith('.' + d) for d in DIRECTORY_DOMAINS)
except Exception:
return False
def _fetch_page_text(url, timeout=15):
"""Fetch URL and return plain text content."""
headers = {
'User-Agent': 'Mozilla/5.0 (compatible; NordaBizBot/1.0)',
'Accept': 'text/html,application/xhtml+xml',
}
try:
resp = requests.get(url, headers=headers, timeout=timeout, allow_redirects=True)
content_type = resp.headers.get('Content-Type', '')
if 'text/html' not in content_type and 'application/xhtml' not in content_type:
return None
resp.encoding = resp.apparent_encoding or 'utf-8'
soup = BeautifulSoup(resp.text, 'html.parser')
# Remove non-content elements
for tag in soup.find_all(['script', 'style', 'nav', 'footer', 'header', 'noscript']):
tag.decompose()
text = soup.get_text(separator=' ', strip=True)
return text
except requests.exceptions.SSLError:
# Retry with HTTP
try:
http_url = url.replace('https://', 'http://')
resp = requests.get(http_url, headers=headers, timeout=timeout, allow_redirects=True)
resp.encoding = resp.apparent_encoding or 'utf-8'
soup = BeautifulSoup(resp.text, 'html.parser')
for tag in soup.find_all(['script', 'style', 'nav', 'footer', 'header', 'noscript']):
tag.decompose()
return soup.get_text(separator=' ', strip=True)
except Exception:
return None
except Exception as e:
logger.warning(f"Failed to fetch {url}: {e}")
return None
class WebsiteDiscoveryService:
"""Discovers and validates website candidates for companies."""
def __init__(self, db=None):
self.db = db
self.brave_api_key = os.getenv('BRAVE_API_KEY')
def discover_for_company(self, company):
"""
Search for website, evaluate top candidates, save the best one.
Scrapes up to 3 results, scores each, picks highest score.
Returns dict with result info.
"""
if not self.brave_api_key:
return {'error': 'BRAVE_API_KEY not configured'}
db = self.db or SessionLocal()
own_session = self.db is None
try:
# Build search query
city = company.address_city or ''
query = f'"{company.name}" {city}'.strip()
# Clear previous error candidates for this company (allows retry)
db.query(WebsiteDiscoveryCandidate).filter_by(
company_id=company.id, status='error'
).delete()
db.commit()
# Search Brave
urls = self._search_brave(query)
if not urls:
return {'error': 'Brak wyników', 'company_id': company.id}
# Evaluate top 3 candidates, pick the best
best_candidate = None
best_score = -1
seen_urls = set()
for brave_result in urls[:3]:
url = _normalize_url_to_root(brave_result['url'])
# Skip duplicate root URLs (e.g. /kontakt/ and /about/ on same domain)
if url in seen_urls:
continue
seen_urls.add(url)
domain = urlparse(url).netloc.lower()
if domain.startswith('www.'):
domain = domain[4:]
# Check for existing candidate with this URL
existing = db.query(WebsiteDiscoveryCandidate).filter_by(
company_id=company.id, candidate_url=url
).first()
if existing:
continue
# Fetch root + common subpages for verification data
all_text = ''
extracted = {
'nips': [], 'regons': [], 'krs': [],
'emails': [], 'phones': [], 'text_snippet': '',
}
subpages = ['', 'kontakt', 'kontakt/', 'contact', 'o-nas', 'o-firmie']
for sub in subpages:
sub_url = url.rstrip('/') + '/' + sub if sub else url
text = _fetch_page_text(sub_url)
if not text:
continue
all_text = all_text + ' ' + text if all_text else text
if not extracted['text_snippet']:
extracted['text_snippet'] = text[:500]
# Merge extracted data (deduplicated)
for nip in _find_nips_in_text(text):
if nip not in extracted['nips']:
extracted['nips'].append(nip)
for regon in _find_regons_in_text(text):
if regon not in extracted['regons']:
extracted['regons'].append(regon)
for krs in _find_krs_in_text(text):
if krs not in extracted['krs']:
extracted['krs'].append(krs)
for email in _extract_emails(text):
if email not in extracted['emails']:
extracted['emails'].append(email)
for phone in _extract_phones(text):
if phone not in extracted['phones']:
extracted['phones'].append(phone)
# Stop scanning subpages if we already found strong signals
if extracted['nips'] or extracted['regons'] or extracted['krs']:
break
page_text = all_text or None
# Compute match signals
signals = self._compute_signals(extracted, company, page_text)
# Domain name matching bonus
domain_match = self._domain_matches_company(domain, company.name)
signals['domain'] = domain_match
confidence, score = self._compute_confidence(signals)
candidate_data = {
'url': url,
'domain': domain,
'brave_result': brave_result,
'extracted': extracted,
'signals': signals,
'confidence': confidence,
'score': score,
'page_text': page_text,
}
if score > best_score:
best_score = score
best_candidate = candidate_data
if not best_candidate:
# All URLs already exist as candidates
return {'status': 'exists', 'company_id': company.id}
# Save best candidate
c = best_candidate
candidate = WebsiteDiscoveryCandidate(
company_id=company.id,
search_query=query,
candidate_url=c['url'],
candidate_domain=c['domain'],
brave_title=c['brave_result'].get('title', ''),
brave_description=c['brave_result'].get('description', ''),
extracted_nips=c['extracted']['nips'] or None,
extracted_regons=c['extracted']['regons'] or None,
extracted_krs=c['extracted']['krs'] or None,
extracted_phones=c['extracted']['phones'] or None,
extracted_emails=c['extracted']['emails'] or None,
page_text_snippet=c['extracted']['text_snippet'] or None,
match_nip=c['signals'].get('nip', False),
match_regon=c['signals'].get('regon', False),
match_krs=c['signals'].get('krs', False),
match_phone=c['signals'].get('phone', False),
match_email=c['signals'].get('email', False),
match_city=c['signals'].get('city', False),
match_owner=c['signals'].get('owner', False),
confidence=c['confidence'],
match_score=c['score'],
)
db.add(candidate)
db.commit()
return {
'status': 'found',
'candidate_id': candidate.id,
'url': c['url'],
'confidence': c['confidence'],
'score': c['score'],
'signals': c['signals'],
}
except Exception as e:
db.rollback()
logger.error(f"Discovery error for company {company.id}: {e}")
return {'error': str(e)}
finally:
if own_session:
db.close()
def _search_brave(self, query, max_retries=3):
"""Search Brave API with retry on 429, filter directories, return top URLs."""
headers = {
'Accept': 'application/json',
'X-Subscription-Token': self.brave_api_key,
}
params = {
'q': query,
'count': 10,
'country': 'pl',
'search_lang': 'pl',
}
for attempt in range(max_retries):
try:
resp = requests.get(
'https://api.search.brave.com/res/v1/web/search',
headers=headers, params=params, timeout=10,
)
if resp.status_code == 429:
wait = 3 * (attempt + 1) # 3s, 6s, 9s
logger.info(f"Brave API rate limited, waiting {wait}s (attempt {attempt + 1}/{max_retries})")
time.sleep(wait)
continue
if resp.status_code != 200:
logger.warning(f"Brave API returned {resp.status_code}")
return []
data = resp.json()
results = []
for item in data.get('web', {}).get('results', []):
url = item.get('url', '')
if not url or _is_directory_domain(url):
continue
results.append({
'url': url,
'title': item.get('title', ''),
'description': item.get('description', ''),
})
return results[:5]
except Exception as e:
logger.error(f"Brave search error: {e}")
return []
logger.warning(f"Brave API rate limit exceeded after {max_retries} retries for: {query}")
return []
def _compute_signals(self, extracted, company, page_text=None):
"""Compare extracted data with company record."""
signals = {}
# NIP match (weight 3)
if company.nip and extracted.get('nips'):
company_nip = _normalize_nip(company.nip)
signals['nip'] = company_nip in [_normalize_nip(n) for n in extracted['nips']]
else:
signals['nip'] = False
# REGON match (weight 3)
if company.regon and extracted.get('regons'):
company_regon = re.sub(r'[^0-9]', '', company.regon)
signals['regon'] = company_regon in [re.sub(r'[^0-9]', '', r) for r in extracted['regons']]
else:
signals['regon'] = False
# KRS match (weight 3)
if company.krs and extracted.get('krs'):
company_krs = re.sub(r'[^0-9]', '', company.krs)
signals['krs'] = company_krs in [re.sub(r'[^0-9]', '', k) for k in extracted['krs']]
else:
signals['krs'] = False
# Phone match (weight 2) - last 9 digits
if company.phone and extracted.get('phones'):
company_digits = re.sub(r'\D', '', company.phone)[-9:]
signals['phone'] = any(
re.sub(r'\D', '', p)[-9:] == company_digits
for p in extracted['phones']
)
else:
signals['phone'] = False
# Email match (weight 2) - exact or same domain
if company.email and extracted.get('emails'):
company_email = company.email.lower()
company_domain = company_email.split('@')[-1] if '@' in company_email else ''
signals['email'] = any(
e == company_email or (company_domain and e.split('@')[-1] == company_domain)
for e in extracted['emails']
)
else:
signals['email'] = False
# City match (weight 1)
text = page_text or ''
if company.address_city and text:
signals['city'] = company.address_city.lower() in text.lower()
else:
signals['city'] = False
# Owner match (weight 1)
if hasattr(company, 'owner_name') and company.owner_name and text:
signals['owner'] = company.owner_name.lower() in text.lower()
else:
signals['owner'] = False
return signals
def _domain_matches_company(self, domain, company_name):
"""Check if domain name matches company name (handles word reordering)."""
if not domain or not company_name:
return False
# Normalize: lowercase, remove common suffixes
name = company_name.lower()
for suffix in [' sp. z o.o.', ' sp.z o.o.', ' s.a.', ' s.c.', ' sp.j.',
' sp. k.', ' sp.p.', ' sp. z o. o.']:
name = name.replace(suffix, '')
# Polish char mapping for domain comparison
pl_map = {'ą': 'a', 'ć': 'c', 'ę': 'e', 'ł': 'l', 'ń': 'n',
'ó': 'o', 'ś': 's', 'ź': 'z', 'ż': 'z'}
# Get domain without TLD
domain_base = domain.split('.')[0].lower()
domain_base_clean = re.sub(r'[^a-z0-9]', '', domain_base)
# Method 1: Full name match (concatenated)
name_concat = re.sub(r'[^a-z0-9ąćęłńóśźż]', '', name)
name_ascii = ''.join(pl_map.get(c, c) for c in name_concat)
if len(name_ascii) >= 3 and (name_ascii in domain_base_clean or domain_base_clean in name_ascii):
return True
# Method 2: All significant words present in domain (handles word reordering)
words = re.findall(r'[a-ząćęłńóśźż]+', name)
words = [w for w in words if len(w) >= 3] # skip short words like "i", "sp"
if words:
words_ascii = [''.join(pl_map.get(c, c) for c in w) for w in words]
if all(w in domain_base_clean for w in words_ascii):
return True
return False
def _compute_confidence(self, signals):
"""Compute confidence level and numeric score."""
weights = {
'nip': 3, 'regon': 3, 'krs': 3,
'phone': 2, 'email': 2,
'city': 1, 'owner': 1,
'domain': 2,
}
score = sum(weights.get(k, 0) for k, v in signals.items() if v)
if score >= 5:
return 'high', score
elif score >= 2:
return 'medium', score
else:
return 'low', score
def discover_bulk(self, limit=50, delay=5.0):
"""
Bulk discovery for all companies without website.
Returns dict with progress info.
"""
db = SessionLocal()
try:
# Find companies without website
companies = db.query(Company).filter(
Company.status.in_(['active', 'pending']),
(Company.website == None) | (Company.website == ''),
).order_by(Company.name).limit(limit).all()
results = {
'total': len(companies),
'processed': 0,
'found': 0,
'errors': 0,
'details': [],
}
service = WebsiteDiscoveryService(db=db)
for company in companies:
result = service.discover_for_company(company)
results['processed'] += 1
if result.get('status') == 'found':
results['found'] += 1
elif result.get('error'):
results['errors'] += 1
results['details'].append({
'company_id': company.id,
'company_name': company.name,
'result': result,
})
# Rate limit
if results['processed'] < results['total']:
time.sleep(delay)
return results
finally:
db.close()