""" Search Service for Norda Biznes Partner Unified search functionality with: - NIP/REGON direct lookup - Synonym expansion - SQLite scoring fallback - PostgreSQL FTS with fuzzy matching (when available) """ import re from dataclasses import dataclass from typing import List, Optional, Tuple from sqlalchemy import text from sqlalchemy.orm import Session from database import Company, Service, Competency, CompanyService, CompanyCompetency @dataclass class SearchResult: """Search result with score and match info""" company: Company score: float match_type: str # 'nip', 'regon', 'exact', 'fuzzy', 'keyword' # Synonym dictionary - reused from nordabiz_chat.py KEYWORD_SYNONYMS = { # IT / Web 'strony': ['www', 'web', 'internet', 'witryny', 'seo', 'e-commerce', 'ecommerce', 'sklep', 'portal'], 'internetowe': ['www', 'web', 'online', 'cyfrowe', 'seo', 'marketing'], 'aplikacje': ['software', 'programowanie', 'systemy', 'crm', 'erp', 'app'], 'it': ['informatyka', 'komputery', 'software', 'systemy', 'serwis'], 'programowanie': ['software', 'kod', 'developer', 'aplikacje'], # Budownictwo 'budowa': ['budownictwo', 'konstrukcje', 'remonty', 'wykończenia', 'dach', 'elewacja'], 'dom': ['budynek', 'mieszkanie', 'nieruchomości', 'budownictwo'], 'remont': ['wykończenie', 'naprawa', 'renowacja', 'modernizacja'], 'hala': ['magazyn', 'produkcja', 'przemysłowa', 'stalowa', 'konstrukcja'], # Transport / Logistyka 'transport': ['przewóz', 'logistyka', 'spedycja', 'dostawa', 'kurier'], 'samochód': ['auto', 'pojazd', 'motoryzacja', 'serwis', 'naprawa'], # Usługi 'księgowość': ['rachunkowość', 'finanse', 'podatki', 'biuro rachunkowe', 'kadry'], 'księgowa': ['rachunkowość', 'finanse', 'podatki', 'biuro rachunkowe'], 'prawo': ['prawnik', 'adwokat', 'radca', 'kancelaria', 'notariusz'], 'prawnik': ['prawo', 'adwokat', 'radca', 'kancelaria', 'prawny'], 'marketing': ['reklama', 'promocja', 'seo', 'social media', 'branding'], # Produkcja 'produkcja': ['wytwarzanie', 'fabryka', 'zakład', 'przemysł'], 'metal': ['stal', 'obróbka', 'spawanie', 'cnc', 'ślusarstwo'], 'drewno': ['stolarka', 'meble', 'tartak', 'carpentry'], 'aluminium': ['alu', 'lekkie', 'profile', 'spawanie'], } class SearchService: """Unified search service for companies""" def __init__(self, db: Session): self.db = db self._is_postgres = self._detect_postgres() def _detect_postgres(self) -> bool: """Check if we're using PostgreSQL""" try: dialect = self.db.bind.dialect.name return dialect == 'postgresql' except: return False def search( self, query: str, category_id: Optional[int] = None, limit: int = 50 ) -> List[SearchResult]: """ Main search method Args: query: Search query string category_id: Optional category filter limit: Max results to return Returns: List of SearchResult objects sorted by relevance """ query = query.strip() if not query: return self._get_all_companies(category_id, limit) # 1. Check for NIP (10 digits) if self._is_nip(query): result = self._search_by_nip(query) if result: return [result] # 2. Check for REGON (9 or 14 digits) if self._is_regon(query): result = self._search_by_regon(query) if result: return [result] # 3. Full-text search with synonyms if self._is_postgres: return self._search_postgres_fts(query, category_id, limit) else: return self._search_sqlite_fallback(query, category_id, limit) def _is_nip(self, query: str) -> bool: """Check if query looks like NIP""" clean = re.sub(r'[\s\-]', '', query) return bool(re.match(r'^\d{10}$', clean)) def _is_regon(self, query: str) -> bool: """Check if query looks like REGON""" clean = re.sub(r'[\s\-]', '', query) return bool(re.match(r'^\d{9}$|^\d{14}$', clean)) def _search_by_nip(self, query: str) -> Optional[SearchResult]: """Direct NIP lookup""" clean_nip = re.sub(r'[\s\-]', '', query) company = self.db.query(Company).filter( Company.nip == clean_nip, Company.status == 'active' ).first() if company: return SearchResult(company=company, score=100.0, match_type='nip') return None def _search_by_regon(self, query: str) -> Optional[SearchResult]: """Direct REGON lookup""" clean_regon = re.sub(r'[\s\-]', '', query) company = self.db.query(Company).filter( Company.regon == clean_regon, Company.status == 'active' ).first() if company: return SearchResult(company=company, score=100.0, match_type='regon') return None def _expand_keywords(self, query: str) -> List[str]: """Expand query with synonyms""" query_lower = query.lower() words = [w.strip('.,?!') for w in query_lower.split() if len(w) > 1] expanded = set(words) for word in words: # Direct synonym lookup if word in KEYWORD_SYNONYMS: expanded.update(KEYWORD_SYNONYMS[word]) # Check if word is in any synonym list for key, synonyms in KEYWORD_SYNONYMS.items(): if word in key or any(word in syn for syn in synonyms): expanded.add(key) expanded.update(synonyms) return list(expanded) def _search_sqlite_fallback( self, query: str, category_id: Optional[int], limit: int ) -> List[SearchResult]: """ SQLite search using keyword scoring Scoring: - Company name match: +10 - Description match: +5 - Service match: +8 - Competency match: +7 - City match: +3 """ keywords = self._expand_keywords(query) # Get all active companies companies_query = self.db.query(Company).filter(Company.status == 'active') if category_id: companies_query = companies_query.filter(Company.category_id == category_id) companies = companies_query.all() # Score each company scored: List[Tuple[float, Company, str]] = [] for company in companies: score = 0.0 match_type = 'keyword' # Name match (highest weight) name_lower = company.name.lower() if any(kw in name_lower for kw in keywords): score += 10 # Exact match bonus if query.lower() in name_lower: score += 5 match_type = 'exact' # Description match if company.description_short: desc_lower = company.description_short.lower() if any(kw in desc_lower for kw in keywords): score += 5 # Services match if company.services: for cs in company.services: if cs.service and any(kw in cs.service.name.lower() for kw in keywords): score += 8 break # Only count once per company # Competencies match if company.competencies: for cc in company.competencies: if cc.competency and any(kw in cc.competency.name.lower() for kw in keywords): score += 7 break # Only count once per company # City match if company.address_city: if any(kw in company.address_city.lower() for kw in keywords): score += 3 # Founding history match (owners, history) - high weight for people search if company.founding_history: history_lower = company.founding_history.lower() if any(kw in history_lower for kw in keywords): score += 12 # High weight - owners/founders are important # Full description match if company.description_full: full_lower = company.description_full.lower() if any(kw in full_lower for kw in keywords): score += 4 if score > 0: scored.append((score, company, match_type)) # Sort by score descending scored.sort(key=lambda x: x[0], reverse=True) # Convert to SearchResult return [ SearchResult(company=c, score=s, match_type=m) for s, c, m in scored[:limit] ] def _search_postgres_fts( self, query: str, category_id: Optional[int], limit: int ) -> List[SearchResult]: """ PostgreSQL full-text search with fuzzy matching Uses: - tsvector for full-text search - pg_trgm for fuzzy matching (typos) """ keywords = self._expand_keywords(query) # Sanitize keywords for tsquery - keep only word characters (alphanumeric + polish chars) sanitized_keywords = [re.sub(r'[^\w]', '', kw, flags=re.UNICODE) for kw in keywords] sanitized_keywords = [kw for kw in sanitized_keywords if kw] # Build tsquery from sanitized keywords tsquery_parts = [f"{kw}:*" for kw in sanitized_keywords] tsquery = ' | '.join(tsquery_parts) # Check if pg_trgm is available try: self.db.execute(text("SELECT 1 FROM pg_extension WHERE extname = 'pg_trgm'")) has_trgm = True except Exception: self.db.rollback() has_trgm = False # Build ILIKE patterns for each keyword (for multi-word searches) # Escape LIKE wildcards in user input before wrapping with % like_patterns = [f'%{kw.replace("%", r"\\%").replace("_", r"\\_")}%' for kw in sanitized_keywords if len(kw) > 2] # Build SQL query with scoring for founding_history matches (owners/founders) if has_trgm: sql = text(""" SELECT c.id, COALESCE(ts_rank(c.search_vector, to_tsquery('simple', :tsquery)), 0) as fts_score, COALESCE(similarity(c.name, :query), 0) as fuzzy_score, CASE WHEN c.founding_history ILIKE ANY(:like_patterns) THEN 0.5 ELSE 0 END as history_score FROM companies c WHERE c.status = 'active' AND ( c.search_vector @@ to_tsquery('simple', :tsquery) OR similarity(c.name, :query) > 0.2 OR c.name ILIKE ANY(:like_patterns) OR c.description_short ILIKE ANY(:like_patterns) OR c.founding_history ILIKE ANY(:like_patterns) OR c.description_full ILIKE ANY(:like_patterns) ) {category_filter} ORDER BY GREATEST( COALESCE(ts_rank(c.search_vector, to_tsquery('simple', :tsquery)), 0), COALESCE(similarity(c.name, :query), 0), CASE WHEN c.founding_history ILIKE ANY(:like_patterns) THEN 0.5 ELSE 0 END ) DESC LIMIT :limit """.format( category_filter="AND c.category_id = :category_id" if category_id else "" )) else: # Fallback without pg_trgm sql = text(""" SELECT c.id, COALESCE(ts_rank(c.search_vector, to_tsquery('simple', :tsquery)), 0) as fts_score, 0 as fuzzy_score, CASE WHEN c.founding_history ILIKE ANY(:like_patterns) THEN 0.5 ELSE 0 END as history_score FROM companies c WHERE c.status = 'active' AND ( c.search_vector @@ to_tsquery('simple', :tsquery) OR c.name ILIKE ANY(:like_patterns) OR c.description_short ILIKE ANY(:like_patterns) OR c.founding_history ILIKE ANY(:like_patterns) OR c.description_full ILIKE ANY(:like_patterns) ) {category_filter} ORDER BY GREATEST(fts_score, CASE WHEN c.founding_history ILIKE ANY(:like_patterns) THEN 0.5 ELSE 0 END) DESC LIMIT :limit """.format( category_filter="AND c.category_id = :category_id" if category_id else "" )) params = { 'tsquery': tsquery, 'query': query, 'like_patterns': like_patterns if like_patterns else ['%%'], 'limit': limit } if category_id: params['category_id'] = category_id try: result = self.db.execute(sql, params) rows = result.fetchall() # Fetch full company objects company_ids = [row[0] for row in rows] if not company_ids: # Fallback to SQLite method if FTS returns nothing return self._search_sqlite_fallback(query, category_id, limit) companies_map = { c.id: c for c in self.db.query(Company).filter(Company.id.in_(company_ids)).all() } results = [] for row in rows: company_id, fts_score, fuzzy_score, history_score = row if company_id in companies_map: # Determine match type and score scores = { 'fts': fts_score or 0, 'fuzzy': fuzzy_score or 0, 'history': history_score or 0 } best_match = max(scores, key=scores.get) score = scores[best_match] * 100 results.append(SearchResult( company=companies_map[company_id], score=score, match_type=best_match )) return results except Exception as e: # If FTS fails, rollback and fall back to SQLite method print(f"PostgreSQL FTS error: {e}, falling back to keyword search") self.db.rollback() return self._search_sqlite_fallback(query, category_id, limit) def _get_all_companies( self, category_id: Optional[int], limit: int ) -> List[SearchResult]: """Return all companies when no query given""" query = self.db.query(Company).filter(Company.status == 'active') if category_id: query = query.filter(Company.category_id == category_id) companies = query.order_by(Company.name).limit(limit).all() return [ SearchResult(company=c, score=0.0, match_type='all') for c in companies ] # Convenience function for use in routes def search_companies( db: Session, query: str, category_id: Optional[int] = None, limit: int = 50 ) -> List[SearchResult]: """ Search companies Args: db: Database session query: Search query category_id: Optional category filter limit: Max results Returns: List of SearchResult objects """ service = SearchService(db) return service.search(query, category_id, limit)