""" Website Discovery Service ========================== Discovers websites for companies that don't have one registered. Uses Brave Web Search API to find candidates, scrapes them for verification data, and compares extracted information against known company data. """ import os import re import time import logging from urllib.parse import urlparse from datetime import datetime import requests from bs4 import BeautifulSoup from database import SessionLocal, Company, WebsiteDiscoveryCandidate from utils.data_quality import update_company_data_quality logger = logging.getLogger(__name__) # Domains to skip - business directories, social media, own portal DIRECTORY_DOMAINS = { # Business directories & registries 'panoramafirm.pl', 'aleo.com', 'rejestr.io', 'krs-pobierz.pl', 'gowork.pl', 'oferteo.pl', 'pkt.pl', 'firmy.net', 'zumi.pl', 'baza-firm.com.pl', 'e-krs.pl', 'krs-online.com.pl', 'regon.info', 'infoveriti.pl', 'companywall.pl', 'findcompany.pl', 'owg.pl', 'imsig.pl', 'monitorfirm.pb.pl', 'mojepanstwo.pl', 'biznes-polska.pl', 'zwiazekpracodawcow.pl', 'notariuszepl.top', 'wypr.pl', 'mapcarta.com', 'analizy.pl', 'transfermarkt.pl', 'mojewejherowo.pl', 'orlyjubilerstwa.pl', 'norda-biznes.info', 'bizraport.pl', 'aplikuj.pl', 'lexspace.pl', 'drewnianeabc.pl', 'f-trust.pl', 'itspace.llc', 'biznesfinder.pl', 'egospodarka.pl', 'bazatel.pl', 'wspanialewesele.com.pl', 'wyszukiwarkakrs.pl', 'funduszowe.pl', 'itspace.company', # Social media 'facebook.com', 'linkedin.com', 'youtube.com', 'instagram.com', 'twitter.com', 'x.com', 'tiktok.com', # Own portal & major sites 'nordabiznes.pl', 'google.com', 'google.pl', 'wikipedia.org', 'olx.pl', 'allegro.pl', } # Geographic proximity scoring for Norda Biznes (Wejherowo region) POWIAT_WEJHEROWSKI = { 'wejherowo', 'reda', 'rumia', 'luzino', 'gniewino', 'szemud', 'łęczyce', 'linia', 'choczewo', 'góra', 'bolszewo', 'gościcino', 'nowy dwór wejherowski', 'kąpino', 'bieszkowice', 'sopieszyno', } WOJEWODZTWO_POMORSKIE = { 'gdańsk', 'gdynia', 'sopot', 'słupsk', 'tczew', 'starogard gdański', 'chojnice', 'malbork', 'kwidzyn', 'lębork', 'bytów', 'kartuzy', 'kościerzyna', 'puck', 'żukowo', 'pruszcz gdański', 'ustka', 'władysławowo', 'hel', 'jastarnia', 'łeba', } # --- Extraction helpers --- def _normalize_nip(nip): return re.sub(r'[^0-9]', '', nip) def _validate_nip(nip): nip = _normalize_nip(nip) if len(nip) != 10: return False weights = [6, 5, 7, 2, 3, 4, 5, 6, 7] try: checksum = sum(int(nip[i]) * weights[i] for i in range(9)) % 11 return checksum == int(nip[9]) except (ValueError, IndexError): return False def _validate_regon(regon): regon = re.sub(r'[^0-9]', '', regon) if len(regon) == 9: weights = [8, 9, 2, 3, 4, 5, 6, 7] checksum = sum(int(regon[i]) * weights[i] for i in range(8)) % 11 if checksum == 10: checksum = 0 return checksum == int(regon[8]) elif len(regon) == 14: weights9 = [8, 9, 2, 3, 4, 5, 6, 7] checksum9 = sum(int(regon[i]) * weights9[i] for i in range(8)) % 11 if checksum9 == 10: checksum9 = 0 if checksum9 != int(regon[8]): return False weights14 = [2, 4, 8, 5, 0, 9, 7, 3, 6, 1, 2, 4, 8] checksum14 = sum(int(regon[i]) * weights14[i] for i in range(13)) % 11 if checksum14 == 10: checksum14 = 0 return checksum14 == int(regon[13]) return False def _find_nips_in_text(text): patterns = [ r'NIP[:\s]*([\d][\d\s-]{8,13}[\d])', # any 10-digit with separators r'NIP[:\s]*(\d{10})', r'numer\s+identyfikacji\s+podatkowej[:\s]*(\d{10})', ] nips = [] for pattern in patterns: matches = re.findall(pattern, text, re.IGNORECASE) for match in matches: nip = _normalize_nip(match) if _validate_nip(nip) and nip not in nips: nips.append(nip) return nips def _find_regons_in_text(text): patterns = [ r'REGON[:\s]*(\d{9,14})', r'rejestr\s+gospodarczy[:\s]*(\d{9,14})', ] regons = [] for pattern in patterns: matches = re.findall(pattern, text, re.IGNORECASE) for match in matches: regon = re.sub(r'[^0-9]', '', match) if _validate_regon(regon) and regon not in regons: regons.append(regon) return regons def _find_krs_in_text(text): patterns = [ r'KRS[:\s]*(\d{10})', r'Krajow\w+\s+Rejestr\w*\s+S[aą]dow\w*[:\s]*(\d{10})', ] krs_numbers = [] for pattern in patterns: matches = re.findall(pattern, text, re.IGNORECASE) for match in matches: krs = re.sub(r'[^0-9]', '', match) if len(krs) == 10 and krs not in krs_numbers: krs_numbers.append(krs) return krs_numbers def _extract_emails(text): email_pattern = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}' emails = re.findall(email_pattern, text) skip_domains = { 'example.com', 'sentry.io', 'wixpress.com', 'wordpress.org', 'w3.org', 'schema.org', 'googleapis.com', } return list(dict.fromkeys( e.lower() for e in emails if not any(d in e.lower() for d in skip_domains) ))[:5] def _extract_phones(text): phone_patterns = [ r'(?:\+48\s?)?\d{2}[\s-]?\d{3}[\s-]?\d{2}[\s-]?\d{2}', r'(?:\+48\s?)?\d{3}[\s-]?\d{3}[\s-]?\d{3}', r'\(\d{2}\)\s?\d{3}[\s-]?\d{2}[\s-]?\d{2}', r'(?:tel|phone|telefon)[.:]\s*[\+]?\d[\d\s\-]{7,14}', ] phones = [] for pattern in phone_patterns: matches = re.findall(pattern, text, re.IGNORECASE) for m in matches: clean = re.sub(r'(?:tel|phone|telefon)[.:]?\s*', '', m, flags=re.IGNORECASE).strip() digits = re.sub(r'\D', '', clean) if 9 <= len(digits) <= 12: phones.append(clean) return list(dict.fromkeys(phones))[:5] def _normalize_url_to_root(url): """Strip path from URL, keep only scheme + domain (root page).""" try: parsed = urlparse(url) scheme = parsed.scheme or 'https' netloc = parsed.netloc if not netloc: return url return f'{scheme}://{netloc}/' except Exception: return url def _is_directory_domain(url): """Check if URL belongs to a known business directory.""" try: domain = urlparse(url).netloc.lower() # Strip www. if domain.startswith('www.'): domain = domain[4:] return any(domain == d or domain.endswith('.' + d) for d in DIRECTORY_DOMAINS) except Exception: return False def _fetch_page_text(url, timeout=15): """Fetch URL and return plain text content.""" headers = { 'User-Agent': 'Mozilla/5.0 (compatible; NordaBizBot/1.0)', 'Accept': 'text/html,application/xhtml+xml', } try: resp = requests.get(url, headers=headers, timeout=timeout, allow_redirects=True) content_type = resp.headers.get('Content-Type', '') if 'text/html' not in content_type and 'application/xhtml' not in content_type: return None resp.encoding = resp.apparent_encoding or 'utf-8' soup = BeautifulSoup(resp.text, 'html.parser') # Remove non-content elements for tag in soup.find_all(['script', 'style', 'nav', 'footer', 'header', 'noscript']): tag.decompose() text = soup.get_text(separator=' ', strip=True) return text except requests.exceptions.SSLError: # Retry with HTTP try: http_url = url.replace('https://', 'http://') resp = requests.get(http_url, headers=headers, timeout=timeout, allow_redirects=True) resp.encoding = resp.apparent_encoding or 'utf-8' soup = BeautifulSoup(resp.text, 'html.parser') for tag in soup.find_all(['script', 'style', 'nav', 'footer', 'header', 'noscript']): tag.decompose() return soup.get_text(separator=' ', strip=True) except Exception: return None except Exception as e: logger.warning(f"Failed to fetch {url}: {e}") return None class WebsiteDiscoveryService: """Discovers and validates website candidates for companies.""" def __init__(self, db=None): self.db = db self.brave_api_key = os.getenv('BRAVE_API_KEY') def discover_for_company(self, company): """ Search for website, evaluate top candidates, save the best one. Scrapes up to 3 results, scores each, picks highest score. Returns dict with result info. """ if not self.brave_api_key: return {'error': 'BRAVE_API_KEY not configured'} db = self.db or SessionLocal() own_session = self.db is None try: # Build search query city = company.address_city or '' query = f'"{company.name}" {city}'.strip() # Clear previous error candidates for this company (allows retry) db.query(WebsiteDiscoveryCandidate).filter_by( company_id=company.id, status='error' ).delete() db.commit() # Search Brave urls = self._search_brave(query) if not urls: return {'error': 'Brak wyników', 'company_id': company.id} # Evaluate top 3 candidates, pick the best best_candidate = None best_score = -1 seen_urls = set() for brave_result in urls[:5]: url = _normalize_url_to_root(brave_result['url']) # Skip duplicate root URLs (e.g. /kontakt/ and /about/ on same domain) if url in seen_urls: continue seen_urls.add(url) domain = urlparse(url).netloc.lower() if domain.startswith('www.'): domain = domain[4:] # Check for existing candidate (exact URL or same domain rejected) existing = db.query(WebsiteDiscoveryCandidate).filter_by( company_id=company.id, candidate_url=url ).first() if existing: continue rejected_domain = db.query(WebsiteDiscoveryCandidate).filter( WebsiteDiscoveryCandidate.company_id == company.id, WebsiteDiscoveryCandidate.candidate_domain == domain, WebsiteDiscoveryCandidate.status == 'rejected', ).first() if rejected_domain: continue # Fetch root + common subpages for verification data all_text = '' extracted = { 'nips': [], 'regons': [], 'krs': [], 'emails': [], 'phones': [], 'text_snippet': '', } subpages = ['', 'kontakt', 'kontakt/', 'contact', 'o-nas', 'o-firmie'] for sub in subpages: sub_url = url.rstrip('/') + '/' + sub if sub else url text = _fetch_page_text(sub_url) if not text: continue all_text = all_text + ' ' + text if all_text else text if not extracted['text_snippet']: extracted['text_snippet'] = text[:500] # Merge extracted data (deduplicated) for nip in _find_nips_in_text(text): if nip not in extracted['nips']: extracted['nips'].append(nip) for regon in _find_regons_in_text(text): if regon not in extracted['regons']: extracted['regons'].append(regon) for krs in _find_krs_in_text(text): if krs not in extracted['krs']: extracted['krs'].append(krs) for email in _extract_emails(text): if email not in extracted['emails']: extracted['emails'].append(email) for phone in _extract_phones(text): if phone not in extracted['phones']: extracted['phones'].append(phone) # Stop scanning subpages if we already found strong signals if extracted['nips'] or extracted['regons'] or extracted['krs']: break page_text = all_text or None # Compute match signals signals = self._compute_signals(extracted, company, page_text) # Domain name matching bonus domain_match = self._domain_matches_company(domain, company.name) signals['domain'] = domain_match confidence, score = self._compute_confidence(signals) candidate_data = { 'url': url, 'domain': domain, 'brave_result': brave_result, 'extracted': extracted, 'signals': signals, 'confidence': confidence, 'score': score, 'page_text': page_text, } if score > best_score: best_score = score best_candidate = candidate_data # NIP/REGON/KRS match = certain, stop evaluating if signals.get('nip') or signals.get('regon') or signals.get('krs'): break if not best_candidate: # All URLs already exist as candidates return {'status': 'exists', 'company_id': company.id} # Save best candidate c = best_candidate candidate = WebsiteDiscoveryCandidate( company_id=company.id, search_query=query, candidate_url=c['url'], candidate_domain=c['domain'], brave_title=c['brave_result'].get('title', ''), brave_description=c['brave_result'].get('description', ''), extracted_nips=c['extracted']['nips'] or None, extracted_regons=c['extracted']['regons'] or None, extracted_krs=c['extracted']['krs'] or None, extracted_phones=c['extracted']['phones'] or None, extracted_emails=c['extracted']['emails'] or None, page_text_snippet=c['extracted']['text_snippet'] or None, match_nip=c['signals'].get('nip', False), match_regon=c['signals'].get('regon', False), match_krs=c['signals'].get('krs', False), match_phone=c['signals'].get('phone', False), match_email=c['signals'].get('email', False), match_city=c['signals'].get('city', False), match_owner=c['signals'].get('owner', False), confidence=c['confidence'], match_score=c['score'], ) db.add(candidate) db.commit() return { 'status': 'found', 'candidate_id': candidate.id, 'url': c['url'], 'confidence': c['confidence'], 'score': c['score'], 'signals': c['signals'], } except Exception as e: db.rollback() logger.error(f"Discovery error for company {company.id}: {e}") return {'error': str(e)} finally: if own_session: db.close() def _search_brave(self, query, max_retries=3): """Search Brave API with retry on 429, filter directories, return top URLs.""" headers = { 'Accept': 'application/json', 'X-Subscription-Token': self.brave_api_key, } params = { 'q': query, 'count': 10, 'country': 'pl', 'search_lang': 'pl', } for attempt in range(max_retries): try: resp = requests.get( 'https://api.search.brave.com/res/v1/web/search', headers=headers, params=params, timeout=10, ) if resp.status_code == 429: wait = 3 * (attempt + 1) # 3s, 6s, 9s logger.info(f"Brave API rate limited, waiting {wait}s (attempt {attempt + 1}/{max_retries})") time.sleep(wait) continue if resp.status_code != 200: logger.warning(f"Brave API returned {resp.status_code}") return [] data = resp.json() results = [] for item in data.get('web', {}).get('results', []): url = item.get('url', '') if not url or _is_directory_domain(url): continue results.append({ 'url': url, 'title': item.get('title', ''), 'description': item.get('description', ''), }) return results[:5] except Exception as e: logger.error(f"Brave search error: {e}") return [] logger.warning(f"Brave API rate limit exceeded after {max_retries} retries for: {query}") return [] def _compute_signals(self, extracted, company, page_text=None): """Compare extracted data with company record.""" signals = {} # NIP match (weight 3) if company.nip and extracted.get('nips'): company_nip = _normalize_nip(company.nip) signals['nip'] = company_nip in [_normalize_nip(n) for n in extracted['nips']] else: signals['nip'] = False # REGON match (weight 3) if company.regon and extracted.get('regons'): company_regon = re.sub(r'[^0-9]', '', company.regon) signals['regon'] = company_regon in [re.sub(r'[^0-9]', '', r) for r in extracted['regons']] else: signals['regon'] = False # KRS match (weight 3) if company.krs and extracted.get('krs'): company_krs = re.sub(r'[^0-9]', '', company.krs) signals['krs'] = company_krs in [re.sub(r'[^0-9]', '', k) for k in extracted['krs']] else: signals['krs'] = False # Phone match (weight 2) - last 9 digits if company.phone and extracted.get('phones'): company_digits = re.sub(r'\D', '', company.phone)[-9:] signals['phone'] = any( re.sub(r'\D', '', p)[-9:] == company_digits for p in extracted['phones'] ) else: signals['phone'] = False # Email match (weight 2) - exact or same domain if company.email and extracted.get('emails'): company_email = company.email.lower() company_domain = company_email.split('@')[-1] if '@' in company_email else '' signals['email'] = any( e == company_email or (company_domain and e.split('@')[-1] == company_domain) for e in extracted['emails'] ) else: signals['email'] = False # City match (weight 1) text = page_text or '' if company.address_city and text: signals['city'] = company.address_city.lower() in text.lower() else: signals['city'] = False # Owner match (weight 1) if hasattr(company, 'owner_name') and company.owner_name and text: signals['owner'] = company.owner_name.lower() in text.lower() else: signals['owner'] = False # Geographic proximity (weight varies: 3/2/1) signals['geo'] = self._compute_geo_proximity(text, url=None) return signals def _compute_geo_proximity(self, page_text, url=None): """Score geographic proximity to Wejherowo region. Returns: 'wejherowo' (3pt), 'powiat' (2pt), 'pomorskie' (1pt), or False. """ text = (page_text or '').lower() if not text: return False # Check Wejherowo first (highest priority) if 'wejherowo' in text: return 'wejherowo' # Check powiat wejherowski cities for city in POWIAT_WEJHEROWSKI: if city in text: return 'powiat' # Check województwo pomorskie for city in WOJEWODZTWO_POMORSKIE: if city in text: return 'pomorskie' return False def _domain_matches_company(self, domain, company_name): """Check if domain name matches company name (handles word reordering).""" if not domain or not company_name: return False # Normalize: lowercase, remove common suffixes name = company_name.lower() for suffix in [' sp. z o.o.', ' sp.z o.o.', ' s.a.', ' s.c.', ' sp.j.', ' sp. k.', ' sp.p.', ' sp. z o. o.']: name = name.replace(suffix, '') # Polish char mapping for domain comparison pl_map = {'ą': 'a', 'ć': 'c', 'ę': 'e', 'ł': 'l', 'ń': 'n', 'ó': 'o', 'ś': 's', 'ź': 'z', 'ż': 'z'} # Get domain without TLD domain_base = domain.split('.')[0].lower() domain_base_clean = re.sub(r'[^a-z0-9]', '', domain_base) # Method 1: Full name match (concatenated) name_concat = re.sub(r'[^a-z0-9ąćęłńóśźż]', '', name) name_ascii = ''.join(pl_map.get(c, c) for c in name_concat) if len(name_ascii) >= 3 and (name_ascii in domain_base_clean or domain_base_clean in name_ascii): return True # Method 2: All significant words present in domain (handles word reordering) words = re.findall(r'[a-ząćęłńóśźż]+', name) words = [w for w in words if len(w) >= 3] # skip short words like "i", "sp" if words: words_ascii = [''.join(pl_map.get(c, c) for c in w) for w in words] if all(w in domain_base_clean for w in words_ascii): return True return False def _compute_confidence(self, signals): """Compute confidence level and numeric score.""" weights = { 'nip': 3, 'regon': 3, 'krs': 3, 'phone': 2, 'email': 2, 'city': 1, 'owner': 1, 'domain': 2, } score = sum(weights.get(k, 0) for k, v in signals.items() if v) # Geographic proximity bonus geo = signals.get('geo') if geo == 'wejherowo': score += 3 elif geo == 'powiat': score += 2 elif geo == 'pomorskie': score += 1 if score >= 5: return 'high', score elif score >= 2: return 'medium', score else: return 'low', score def discover_bulk(self, limit=50, delay=5.0): """ Bulk discovery for all companies without website. Returns dict with progress info. """ db = SessionLocal() try: # Find companies without website companies = db.query(Company).filter( Company.status.in_(['active', 'pending']), (Company.website == None) | (Company.website == ''), ).order_by(Company.name).limit(limit).all() results = { 'total': len(companies), 'processed': 0, 'found': 0, 'errors': 0, 'details': [], } service = WebsiteDiscoveryService(db=db) for company in companies: result = service.discover_for_company(company) results['processed'] += 1 if result.get('status') == 'found': results['found'] += 1 elif result.get('error'): results['errors'] += 1 results['details'].append({ 'company_id': company.id, 'company_name': company.name, 'result': result, }) # Rate limit if results['processed'] < results['total']: time.sleep(delay) return results finally: db.close()