nordabiz/search_service.py
Maciej Pienczyn e718d96a7d
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
fix(security): Resolve 1 HIGH and 7 MEDIUM vulnerabilities from code review
- HIGH: Fix SQL injection in ZOPK knowledge service (3 functions) — replace f-strings with parameterized queries
- MEDIUM: Sanitize tsquery/LIKE input in SearchService to prevent injection
- MEDIUM: Add @login_required + @role_required(ADMIN) to /health/full endpoint
- MEDIUM: Add @role_required(ADMIN) to ZOPK knowledge search API
- MEDIUM: Add bleach HTML sanitization on write for announcements, events, board proceedings (stored XSS via |safe)
- MEDIUM: Remove partial API key from Gemini service logs
- MEDIUM: Remove @csrf.exempt from chat endpoints, add X-CSRFToken headers in JS
- MEDIUM: Add missing CSRF tokens to 3 POST forms (data_request, benefits_form, benefits_list)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-06 05:25:18 +01:00

423 lines
15 KiB
Python

"""
Search Service for Norda Biznes Partner
Unified search functionality with:
- NIP/REGON direct lookup
- Synonym expansion
- SQLite scoring fallback
- PostgreSQL FTS with fuzzy matching (when available)
"""
import re
from dataclasses import dataclass
from typing import List, Optional, Tuple
from sqlalchemy import text
from sqlalchemy.orm import Session
from database import Company, Service, Competency, CompanyService, CompanyCompetency
@dataclass
class SearchResult:
"""Search result with score and match info"""
company: Company
score: float
match_type: str # 'nip', 'regon', 'exact', 'fuzzy', 'keyword'
# Synonym dictionary - reused from nordabiz_chat.py
KEYWORD_SYNONYMS = {
# IT / Web
'strony': ['www', 'web', 'internet', 'witryny', 'seo', 'e-commerce', 'ecommerce', 'sklep', 'portal'],
'internetowe': ['www', 'web', 'online', 'cyfrowe', 'seo', 'marketing'],
'aplikacje': ['software', 'programowanie', 'systemy', 'crm', 'erp', 'app'],
'it': ['informatyka', 'komputery', 'software', 'systemy', 'serwis'],
'programowanie': ['software', 'kod', 'developer', 'aplikacje'],
# Budownictwo
'budowa': ['budownictwo', 'konstrukcje', 'remonty', 'wykończenia', 'dach', 'elewacja'],
'dom': ['budynek', 'mieszkanie', 'nieruchomości', 'budownictwo'],
'remont': ['wykończenie', 'naprawa', 'renowacja', 'modernizacja'],
'hala': ['magazyn', 'produkcja', 'przemysłowa', 'stalowa', 'konstrukcja'],
# Transport / Logistyka
'transport': ['przewóz', 'logistyka', 'spedycja', 'dostawa', 'kurier'],
'samochód': ['auto', 'pojazd', 'motoryzacja', 'serwis', 'naprawa'],
# Usługi
'księgowość': ['rachunkowość', 'finanse', 'podatki', 'biuro rachunkowe', 'kadry'],
'księgowa': ['rachunkowość', 'finanse', 'podatki', 'biuro rachunkowe'],
'prawo': ['prawnik', 'adwokat', 'radca', 'kancelaria', 'notariusz'],
'prawnik': ['prawo', 'adwokat', 'radca', 'kancelaria', 'prawny'],
'marketing': ['reklama', 'promocja', 'seo', 'social media', 'branding'],
# Produkcja
'produkcja': ['wytwarzanie', 'fabryka', 'zakład', 'przemysł'],
'metal': ['stal', 'obróbka', 'spawanie', 'cnc', 'ślusarstwo'],
'drewno': ['stolarka', 'meble', 'tartak', 'carpentry'],
'aluminium': ['alu', 'lekkie', 'profile', 'spawanie'],
}
class SearchService:
"""Unified search service for companies"""
def __init__(self, db: Session):
self.db = db
self._is_postgres = self._detect_postgres()
def _detect_postgres(self) -> bool:
"""Check if we're using PostgreSQL"""
try:
dialect = self.db.bind.dialect.name
return dialect == 'postgresql'
except:
return False
def search(
self,
query: str,
category_id: Optional[int] = None,
limit: int = 50
) -> List[SearchResult]:
"""
Main search method
Args:
query: Search query string
category_id: Optional category filter
limit: Max results to return
Returns:
List of SearchResult objects sorted by relevance
"""
query = query.strip()
if not query:
return self._get_all_companies(category_id, limit)
# 1. Check for NIP (10 digits)
if self._is_nip(query):
result = self._search_by_nip(query)
if result:
return [result]
# 2. Check for REGON (9 or 14 digits)
if self._is_regon(query):
result = self._search_by_regon(query)
if result:
return [result]
# 3. Full-text search with synonyms
if self._is_postgres:
return self._search_postgres_fts(query, category_id, limit)
else:
return self._search_sqlite_fallback(query, category_id, limit)
def _is_nip(self, query: str) -> bool:
"""Check if query looks like NIP"""
clean = re.sub(r'[\s\-]', '', query)
return bool(re.match(r'^\d{10}$', clean))
def _is_regon(self, query: str) -> bool:
"""Check if query looks like REGON"""
clean = re.sub(r'[\s\-]', '', query)
return bool(re.match(r'^\d{9}$|^\d{14}$', clean))
def _search_by_nip(self, query: str) -> Optional[SearchResult]:
"""Direct NIP lookup"""
clean_nip = re.sub(r'[\s\-]', '', query)
company = self.db.query(Company).filter(
Company.nip == clean_nip,
Company.status == 'active'
).first()
if company:
return SearchResult(company=company, score=100.0, match_type='nip')
return None
def _search_by_regon(self, query: str) -> Optional[SearchResult]:
"""Direct REGON lookup"""
clean_regon = re.sub(r'[\s\-]', '', query)
company = self.db.query(Company).filter(
Company.regon == clean_regon,
Company.status == 'active'
).first()
if company:
return SearchResult(company=company, score=100.0, match_type='regon')
return None
def _expand_keywords(self, query: str) -> List[str]:
"""Expand query with synonyms"""
query_lower = query.lower()
words = [w.strip('.,?!') for w in query_lower.split() if len(w) > 1]
expanded = set(words)
for word in words:
# Direct synonym lookup
if word in KEYWORD_SYNONYMS:
expanded.update(KEYWORD_SYNONYMS[word])
# Check if word is in any synonym list
for key, synonyms in KEYWORD_SYNONYMS.items():
if word in key or any(word in syn for syn in synonyms):
expanded.add(key)
expanded.update(synonyms)
return list(expanded)
def _search_sqlite_fallback(
self,
query: str,
category_id: Optional[int],
limit: int
) -> List[SearchResult]:
"""
SQLite search using keyword scoring
Scoring:
- Company name match: +10
- Description match: +5
- Service match: +8
- Competency match: +7
- City match: +3
"""
keywords = self._expand_keywords(query)
# Get all active companies
companies_query = self.db.query(Company).filter(Company.status == 'active')
if category_id:
companies_query = companies_query.filter(Company.category_id == category_id)
companies = companies_query.all()
# Score each company
scored: List[Tuple[float, Company, str]] = []
for company in companies:
score = 0.0
match_type = 'keyword'
# Name match (highest weight)
name_lower = company.name.lower()
if any(kw in name_lower for kw in keywords):
score += 10
# Exact match bonus
if query.lower() in name_lower:
score += 5
match_type = 'exact'
# Description match
if company.description_short:
desc_lower = company.description_short.lower()
if any(kw in desc_lower for kw in keywords):
score += 5
# Services match
if company.services:
for cs in company.services:
if cs.service and any(kw in cs.service.name.lower() for kw in keywords):
score += 8
break # Only count once per company
# Competencies match
if company.competencies:
for cc in company.competencies:
if cc.competency and any(kw in cc.competency.name.lower() for kw in keywords):
score += 7
break # Only count once per company
# City match
if company.address_city:
if any(kw in company.address_city.lower() for kw in keywords):
score += 3
# Founding history match (owners, history) - high weight for people search
if company.founding_history:
history_lower = company.founding_history.lower()
if any(kw in history_lower for kw in keywords):
score += 12 # High weight - owners/founders are important
# Full description match
if company.description_full:
full_lower = company.description_full.lower()
if any(kw in full_lower for kw in keywords):
score += 4
if score > 0:
scored.append((score, company, match_type))
# Sort by score descending
scored.sort(key=lambda x: x[0], reverse=True)
# Convert to SearchResult
return [
SearchResult(company=c, score=s, match_type=m)
for s, c, m in scored[:limit]
]
def _search_postgres_fts(
self,
query: str,
category_id: Optional[int],
limit: int
) -> List[SearchResult]:
"""
PostgreSQL full-text search with fuzzy matching
Uses:
- tsvector for full-text search
- pg_trgm for fuzzy matching (typos)
"""
keywords = self._expand_keywords(query)
# Sanitize keywords for tsquery - keep only word characters (alphanumeric + polish chars)
sanitized_keywords = [re.sub(r'[^\w]', '', kw, flags=re.UNICODE) for kw in keywords]
sanitized_keywords = [kw for kw in sanitized_keywords if kw]
# Build tsquery from sanitized keywords
tsquery_parts = [f"{kw}:*" for kw in sanitized_keywords]
tsquery = ' | '.join(tsquery_parts)
# Check if pg_trgm is available
try:
self.db.execute(text("SELECT 1 FROM pg_extension WHERE extname = 'pg_trgm'"))
has_trgm = True
except Exception:
self.db.rollback()
has_trgm = False
# Build ILIKE patterns for each keyword (for multi-word searches)
# Escape LIKE wildcards in user input before wrapping with %
like_patterns = [f'%{kw.replace("%", r"\\%").replace("_", r"\\_")}%' for kw in sanitized_keywords if len(kw) > 2]
# Build SQL query with scoring for founding_history matches (owners/founders)
if has_trgm:
sql = text("""
SELECT c.id,
COALESCE(ts_rank(c.search_vector, to_tsquery('simple', :tsquery)), 0) as fts_score,
COALESCE(similarity(c.name, :query), 0) as fuzzy_score,
CASE WHEN c.founding_history ILIKE ANY(:like_patterns) THEN 0.5 ELSE 0 END as history_score
FROM companies c
WHERE c.status = 'active'
AND (
c.search_vector @@ to_tsquery('simple', :tsquery)
OR similarity(c.name, :query) > 0.2
OR c.name ILIKE ANY(:like_patterns)
OR c.description_short ILIKE ANY(:like_patterns)
OR c.founding_history ILIKE ANY(:like_patterns)
OR c.description_full ILIKE ANY(:like_patterns)
)
{category_filter}
ORDER BY GREATEST(
COALESCE(ts_rank(c.search_vector, to_tsquery('simple', :tsquery)), 0),
COALESCE(similarity(c.name, :query), 0),
CASE WHEN c.founding_history ILIKE ANY(:like_patterns) THEN 0.5 ELSE 0 END
) DESC
LIMIT :limit
""".format(
category_filter="AND c.category_id = :category_id" if category_id else ""
))
else:
# Fallback without pg_trgm
sql = text("""
SELECT c.id,
COALESCE(ts_rank(c.search_vector, to_tsquery('simple', :tsquery)), 0) as fts_score,
0 as fuzzy_score,
CASE WHEN c.founding_history ILIKE ANY(:like_patterns) THEN 0.5 ELSE 0 END as history_score
FROM companies c
WHERE c.status = 'active'
AND (
c.search_vector @@ to_tsquery('simple', :tsquery)
OR c.name ILIKE ANY(:like_patterns)
OR c.description_short ILIKE ANY(:like_patterns)
OR c.founding_history ILIKE ANY(:like_patterns)
OR c.description_full ILIKE ANY(:like_patterns)
)
{category_filter}
ORDER BY GREATEST(fts_score, CASE WHEN c.founding_history ILIKE ANY(:like_patterns) THEN 0.5 ELSE 0 END) DESC
LIMIT :limit
""".format(
category_filter="AND c.category_id = :category_id" if category_id else ""
))
params = {
'tsquery': tsquery,
'query': query,
'like_patterns': like_patterns if like_patterns else ['%%'],
'limit': limit
}
if category_id:
params['category_id'] = category_id
try:
result = self.db.execute(sql, params)
rows = result.fetchall()
# Fetch full company objects
company_ids = [row[0] for row in rows]
if not company_ids:
# Fallback to SQLite method if FTS returns nothing
return self._search_sqlite_fallback(query, category_id, limit)
companies_map = {
c.id: c for c in self.db.query(Company).filter(Company.id.in_(company_ids)).all()
}
results = []
for row in rows:
company_id, fts_score, fuzzy_score, history_score = row
if company_id in companies_map:
# Determine match type and score
scores = {
'fts': fts_score or 0,
'fuzzy': fuzzy_score or 0,
'history': history_score or 0
}
best_match = max(scores, key=scores.get)
score = scores[best_match] * 100
results.append(SearchResult(
company=companies_map[company_id],
score=score,
match_type=best_match
))
return results
except Exception as e:
# If FTS fails, rollback and fall back to SQLite method
print(f"PostgreSQL FTS error: {e}, falling back to keyword search")
self.db.rollback()
return self._search_sqlite_fallback(query, category_id, limit)
def _get_all_companies(
self,
category_id: Optional[int],
limit: int
) -> List[SearchResult]:
"""Return all companies when no query given"""
query = self.db.query(Company).filter(Company.status == 'active')
if category_id:
query = query.filter(Company.category_id == category_id)
companies = query.order_by(Company.name).limit(limit).all()
return [
SearchResult(company=c, score=0.0, match_type='all')
for c in companies
]
# Convenience function for use in routes
def search_companies(
db: Session,
query: str,
category_id: Optional[int] = None,
limit: int = 50
) -> List[SearchResult]:
"""
Search companies
Args:
db: Database session
query: Search query
category_id: Optional category filter
limit: Max results
Returns:
List of SearchResult objects
"""
service = SearchService(db)
return service.search(query, category_id, limit)