nordabiz/logo_fetch_service.py
Maciej Pienczyn 788ac12d30
Some checks are pending
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
feat: Add logo recommendation scoring to candidate gallery
Score candidates based on format (SVG > raster), source reliability
(img_scan > apple-touch-icon > og:image > favicon), aspect ratio
(square-ish preferred), and size. Recommended candidate gets amber
border with "rekomendowane" badge in gallery UI.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-18 17:29:31 +01:00

470 lines
17 KiB
Python

"""
Logo Fetch Service - Downloads multiple logo candidates from company websites.
Strategies (in priority order):
1. <img> elements with "logo" in class/id/alt/src
2. apple-touch-icon / link rel="icon" (largest size)
3. og:image / twitter:image meta tags
4. Google Favicon API fallback
Flow:
1. Fetch website, find all candidates
2. Download up to MAX_CANDIDATES, save as {slug}_cand_{i}.webp
3. Frontend shows gallery — user picks one
4. confirm_candidate() renames chosen file to {slug}.webp
5. cleanup_candidates() removes temp files
"""
import glob
import logging
import os
import re
from io import BytesIO
from urllib.parse import urljoin, urlparse
import requests
from bs4 import BeautifulSoup
logger = logging.getLogger(__name__)
USER_AGENT = 'Mozilla/5.0 (compatible; NordaBizBot/1.0)'
TIMEOUT = 10
MAX_DOWNLOAD_SIZE = 5 * 1024 * 1024 # 5MB
MIN_LOGO_SIZE = 32 # px (lowered to catch more candidates)
MAX_LOGO_SIZE = 800 # px
WEBP_QUALITY = 85
MAX_CANDIDATES = 6
LOGO_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static', 'img', 'companies')
SOURCE_LABELS = {
'img_scan': 'Element HTML',
'css_bg': 'Tło CSS',
'apple-touch-icon': 'Apple Touch Icon',
'og:image': 'Open Graph',
'twitter:image': 'Twitter Card',
'favicon': 'Favicon',
'google_favicon': 'Google Favicon',
}
class LogoFetchService:
def fetch_candidates(self, website_url: str, slug: str) -> dict:
"""
Fetch multiple logo candidates from company website.
Returns: {
'success': bool,
'message': str,
'candidates': [{'index': 0, 'source': str, 'label': str, 'ext': str, 'width': int, 'height': int}, ...],
'steps': [...]
}
"""
steps = []
candidates = []
if not website_url.startswith('http'):
website_url = 'https://' + website_url
# Step 1: Fetch website
html, base_url = self._step_fetch_website(website_url, steps)
if html is None:
return {'success': False, 'message': steps[-1]['message'], 'candidates': [], 'steps': steps}
soup = BeautifulSoup(html, 'html.parser')
# Step 2: Meta tags
self._step_meta_tags(soup, base_url, candidates, steps)
# Step 3: Scan images
self._step_scan_images(soup, base_url, candidates, steps)
# Add Google Favicon as last-resort fallback
domain = urlparse(base_url).netloc
if domain:
candidates.append({
'url': f'https://www.google.com/s2/favicons?domain={domain}&sz=128',
'source': 'google_favicon',
'priority': 100
})
if not candidates:
steps.append({'step': 'download', 'status': 'error', 'message': 'Nie znaleziono kandydatów na logo'})
return {'success': False, 'message': 'Nie znaleziono logo na stronie firmy', 'candidates': [], 'steps': steps}
# Deduplicate by URL
seen_urls = set()
unique = []
for c in candidates:
if c['url'] not in seen_urls:
seen_urls.add(c['url'])
unique.append(c)
candidates = unique
# Sort by priority
candidates.sort(key=lambda c: c['priority'])
# Step 4+5+6: Download, convert, save each candidate
saved = self._download_and_save_candidates(candidates, slug, steps)
if not saved:
return {'success': False, 'message': 'Nie udało się pobrać żadnego kandydata', 'candidates': [], 'steps': steps}
# Score candidates and find recommendation
recommended_index = None
if saved:
best = max(saved, key=lambda c: self._score_candidate(c))
recommended_index = best['index']
steps.append({
'step': 'save',
'status': 'complete',
'message': f'Zapisano {len(saved)} kandydatów do wyboru'
})
return {
'success': True,
'message': f'Znaleziono {len(saved)} kandydatów na logo',
'candidates': saved,
'recommended_index': recommended_index,
'steps': steps
}
def _download_and_save_candidates(self, candidates, slug, steps):
"""Download up to MAX_CANDIDATES, convert and save each."""
saved = []
idx = 0
# Clean up old candidates first
self.cleanup_candidates(slug)
for candidate in candidates:
if idx >= MAX_CANDIDATES:
break
url = candidate['url']
try:
response = requests.get(url, timeout=TIMEOUT, headers={
'User-Agent': USER_AGENT
})
content_length = int(response.headers.get('content-length', 0))
if content_length > MAX_DOWNLOAD_SIZE:
continue
content_type = response.headers.get('content-type', '')
if not any(t in content_type for t in ['image', 'svg', 'octet-stream']):
if 'html' in content_type:
continue
data = response.content
if len(data) > MAX_DOWNLOAD_SIZE or len(data) < 100:
continue
is_svg = 'svg' in content_type
width, height = 0, 0
if not is_svg:
try:
from PIL import Image
img = Image.open(BytesIO(data))
width, height = img.size
if width < MIN_LOGO_SIZE or height < MIN_LOGO_SIZE:
continue
except Exception:
continue
# Convert
output_data, file_ext = self._convert(data, is_svg)
if output_data is None:
continue
# Get dimensions after conversion
if not is_svg and width == 0:
try:
from PIL import Image
img = Image.open(BytesIO(output_data))
width, height = img.size
except Exception:
pass
# Save as candidate file
os.makedirs(LOGO_DIR, exist_ok=True)
filename = f'{slug}_cand_{idx}.{file_ext}'
filepath = os.path.join(LOGO_DIR, filename)
with open(filepath, 'wb') as f:
f.write(output_data)
saved.append({
'index': idx,
'source': candidate['source'],
'label': SOURCE_LABELS.get(candidate['source'], candidate['source']),
'ext': file_ext,
'width': width,
'height': height,
'size': len(output_data),
'filename': filename
})
idx += 1
except Exception as e:
logger.debug(f"Failed to download candidate {url}: {e}")
continue
count = len(saved)
if count > 0:
steps.append({
'step': 'download',
'status': 'complete',
'message': f'Pobrano {count} obrazów'
})
else:
steps.append({
'step': 'download',
'status': 'error',
'message': 'Żaden kandydat nie spełnił wymagań'
})
return saved
def _convert(self, image_data, is_svg):
"""Convert image to WebP (or keep SVG). Returns (data, ext) or (None, None)."""
if is_svg:
return image_data, 'svg'
try:
from PIL import Image
img = Image.open(BytesIO(image_data))
if img.mode in ('RGBA', 'LA', 'P'):
if img.mode == 'P':
img = img.convert('RGBA')
background = Image.new('RGBA', img.size, (255, 255, 255, 255))
background.paste(img, mask=img.split()[-1] if 'A' in img.mode else None)
img = background.convert('RGB')
elif img.mode != 'RGB':
img = img.convert('RGB')
w, h = img.size
if w > MAX_LOGO_SIZE or h > MAX_LOGO_SIZE:
img.thumbnail((MAX_LOGO_SIZE, MAX_LOGO_SIZE), Image.LANCZOS)
output = BytesIO()
img.save(output, format='WEBP', quality=WEBP_QUALITY)
return output.getvalue(), 'webp'
except Exception as e:
logger.debug(f"Conversion error: {e}")
return None, None
@staticmethod
def confirm_candidate(slug: str, index: int) -> bool:
"""Rename chosen candidate to final logo file."""
# Find candidate file
for ext in ('webp', 'svg'):
cand = os.path.join(LOGO_DIR, f'{slug}_cand_{index}.{ext}')
if os.path.exists(cand):
final = os.path.join(LOGO_DIR, f'{slug}.{ext}')
# Remove old logo in other format
for old_ext in ('webp', 'svg'):
old = os.path.join(LOGO_DIR, f'{slug}.{old_ext}')
if old != final and os.path.exists(old):
os.remove(old)
os.rename(cand, final)
# Cleanup remaining candidates
LogoFetchService.cleanup_candidates(slug)
return True
return False
@staticmethod
def cleanup_candidates(slug: str):
"""Delete all candidate files for a slug."""
pattern = os.path.join(LOGO_DIR, f'{slug}_cand_*')
for f in glob.glob(pattern):
try:
os.remove(f)
except OSError:
pass
@staticmethod
def has_existing_logo(slug: str) -> str | None:
"""Check if company already has a logo. Returns extension or None."""
for ext in ('webp', 'svg'):
if os.path.exists(os.path.join(LOGO_DIR, f'{slug}.{ext}')):
return ext
return None
def _step_fetch_website(self, url, steps):
"""Step 1: Fetch the website HTML."""
try:
response = requests.get(url, timeout=TIMEOUT, headers={
'User-Agent': USER_AGENT,
'Accept': 'text/html,application/xhtml+xml'
}, allow_redirects=True)
response.raise_for_status()
steps.append({
'step': 'fetch_website',
'status': 'complete',
'message': f'Strona pobrana ({len(response.text)} znaków)'
})
return response.text, response.url
except requests.exceptions.SSLError:
try:
http_url = url.replace('https://', 'http://')
response = requests.get(http_url, timeout=TIMEOUT, headers={
'User-Agent': USER_AGENT
}, allow_redirects=True)
response.raise_for_status()
steps.append({
'step': 'fetch_website',
'status': 'complete',
'message': f'Strona pobrana przez HTTP (błąd SSL)'
})
return response.text, response.url
except Exception as e:
steps.append({
'step': 'fetch_website',
'status': 'error',
'message': f'Błąd SSL i HTTP: {str(e)[:100]}'
})
return None, None
except Exception as e:
steps.append({
'step': 'fetch_website',
'status': 'error',
'message': f'Nie udało się pobrać strony: {str(e)[:100]}'
})
return None, None
def _step_meta_tags(self, soup, base_url, candidates, steps):
"""Step 2: Search meta tags for logo candidates."""
found = []
og_img = soup.find('meta', property='og:image')
if og_img and og_img.get('content'):
url = urljoin(base_url, og_img['content'])
candidates.append({'url': url, 'source': 'og:image', 'priority': 10})
found.append('og:image')
tw_img = soup.find('meta', attrs={'name': 'twitter:image'})
if tw_img and tw_img.get('content'):
url = urljoin(base_url, tw_img['content'])
candidates.append({'url': url, 'source': 'twitter:image', 'priority': 11})
found.append('twitter:image')
touch_icons = soup.find_all('link', rel=lambda r: r and 'apple-touch-icon' in r)
if touch_icons:
best = max(touch_icons, key=lambda t: self._parse_size(t.get('sizes', '0x0')))
url = urljoin(base_url, best.get('href', ''))
if url:
candidates.append({'url': url, 'source': 'apple-touch-icon', 'priority': 5})
found.append('apple-touch-icon')
icons = soup.find_all('link', rel=lambda r: r and 'icon' in r and 'apple' not in str(r))
for icon in icons:
size = self._parse_size(icon.get('sizes', '0x0'))
href = icon.get('href', '')
if href and size >= 32:
url = urljoin(base_url, href)
candidates.append({'url': url, 'source': 'favicon', 'priority': 15})
found.append(f'favicon ({icon.get("sizes", "?")})')
if found:
steps.append({'step': 'meta_tags', 'status': 'complete', 'message': f'Znaleziono: {", ".join(found)}'})
else:
steps.append({'step': 'meta_tags', 'status': 'missing', 'message': 'Brak meta tagów z logo'})
def _step_scan_images(self, soup, base_url, candidates, steps):
"""Step 3: Scan img elements for logo candidates."""
found_count = 0
for img in soup.find_all('img'):
attrs_text = ' '.join([
img.get('class', [''])[0] if isinstance(img.get('class'), list) else str(img.get('class', '')),
img.get('id', ''),
img.get('alt', ''),
img.get('src', '')
]).lower()
if 'logo' in attrs_text:
src = img.get('src') or img.get('data-src') or img.get('data-lazy-src')
if src:
url = urljoin(base_url, src)
priority = 20
if 'logo' in (img.get('id', '') + ' '.join(img.get('class', []))).lower():
priority = 3
elif 'logo' in img.get('alt', '').lower():
priority = 8
candidates.append({'url': url, 'source': 'img_scan', 'priority': priority})
found_count += 1
for el in soup.select('header a[class*="logo"], nav a[class*="logo"], .logo, #logo, [class*="brand"]'):
style = el.get('style', '')
bg_match = re.search(r'url\(["\']?([^"\')\s]+)["\']?\)', style)
if bg_match:
url = urljoin(base_url, bg_match.group(1))
candidates.append({'url': url, 'source': 'css_bg', 'priority': 7})
found_count += 1
if found_count > 0:
steps.append({'step': 'scan_images', 'status': 'complete', 'message': f'Znaleziono {found_count} kandydatów z elementów img/CSS'})
else:
steps.append({'step': 'scan_images', 'status': 'missing', 'message': 'Brak elementów img z "logo" w atrybutach'})
@staticmethod
def _score_candidate(candidate):
"""Score a candidate for recommendation. Higher = better logo choice."""
score = 0
# SVG is ideal for logos (vector, scalable)
if candidate['ext'] == 'svg':
score += 30
# Source reliability — how likely this is the actual logo
source_scores = {
'img_scan': 35,
'css_bg': 25,
'apple-touch-icon': 15,
'og:image': 0,
'twitter:image': 0,
'favicon': -10,
'google_favicon': -20,
}
score += source_scores.get(candidate['source'], 0)
# Aspect ratio — square-ish logos are most versatile
w, h = candidate.get('width', 0), candidate.get('height', 0)
if w > 0 and h > 0:
ratio = max(w, h) / min(w, h)
if ratio <= 2.0:
score += 20
elif ratio <= 3.0:
score += 5
else:
score -= 15
elif candidate['ext'] == 'svg':
score += 15
# Size — prefer medium-sized, not favicon-tiny or banner-huge
max_dim = max(w, h) if w > 0 else 0
if max_dim == 0 and candidate['ext'] == 'svg':
score += 10
elif 64 <= max_dim <= 512:
score += 15
elif max_dim > 512:
score += 5
elif max_dim > 0:
score -= 10
return score
@staticmethod
def _parse_size(sizes_str):
"""Parse '180x180' to max dimension int."""
match = re.search(r'(\d+)', str(sizes_str))
return int(match.group(1)) if match else 0