nordabiz/app.py
Maciej Pienczyn 6c4db17807
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
feat(push): Web Push (VAPID + pywebpush) dla prywatnych wiadomości
Pierwsza iteracja — trigger to nowa wiadomość prywatna. Rollout
fazowany przez PUSH_USER_WHITELIST w .env: pusta = wszyscy, lista
user_id = tylko wymienieni. Ta sama flaga kontroluje widoczność
dzwonka w navbarze (context_processor inject_push_visibility).

Co jest:
- database/migrations/100 — push_subscriptions + notify_push_messages
- database.py — PushSubscription model + relacja na User
- blueprints/push/ — vapid-public-key, subscribe, unsubscribe, test,
  pending-url (iOS PWA), CSRF exempt, auto-prune martwych (410/404/403)
- static/sw.js — push + notificationclick (z iOS fallback przez
  /push/pending-url w Redis, TTL 5 min)
- static/js/push-client.js — togglePush, iOS detection, ?pushdiag=1
- base.html — dzwonek + wpięcie skryptu gated przez push_bell_visible
- message_routes.py — _send_message_push_notifications po emailach
- requirements.txt — pywebpush==2.0.3

Kill switch: PUSH_KILL_SWITCH=1 zatrzymuje wszystkie wysyłki.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-14 16:56:49 +02:00

1696 lines
61 KiB
Python

#!/usr/bin/env python3
"""
Norda Biznes Partner - Flask Application
====================================
Main Flask application for Norda Biznes company directory with AI chat.
Features:
- User authentication with email confirmation
- Company directory with advanced search
- AI chat assistant powered by Google Gemini
- PostgreSQL database integration
- Analytics dashboard for chat insights
Author: Maciej Pienczyn, InPi sp. z o.o.
Created: 2025-11-23
"""
import os
import logging
import secrets
import re
import json
import time
from pathlib import Path
from datetime import datetime, timedelta, date
from flask import Flask, render_template, request, jsonify, redirect, url_for, flash, session, Response, send_file, abort
from flask_login import login_user, logout_user, login_required, current_user
# Note: CSRFProtect, Limiter, LoginManager are imported from extensions.py (line ~250)
from werkzeug.security import generate_password_hash, check_password_hash
from dotenv import load_dotenv
from user_agents import parse as parse_user_agent
import uuid
import traceback as tb_module
# Load environment variables (override any existing env vars)
# Try .env first, then nordabiz_config.txt for production flexibility
import os
if os.path.exists('.env'):
load_dotenv('.env', override=True)
elif os.path.exists('nordabiz_config.txt'):
load_dotenv('nordabiz_config.txt', override=True)
else:
load_dotenv(override=True)
# ============================================================
# GLOBAL CONSTANTS - MARKETING
# ============================================================
# Liczba podmiotów gospodarczych (cel marketingowy Izby NORDA)
# Używana we wszystkich miejscach wyświetlających liczbę firm
COMPANY_COUNT_MARKETING = 150
# ============================================================
# STAGING TEST FEATURES
# ============================================================
# Features currently being tested on staging environment.
# Only rendered when STAGING=true in .env. Edit this dict to update.
STAGING_TEST_FEATURES = {
}
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Security logger for fail2ban integration
# Logs to /var/log/nordabiznes/security.log in production
security_logger = logging.getLogger('security')
security_logger.setLevel(logging.WARNING)
_security_log_path = '/var/log/nordabiznes/security.log'
if os.path.exists('/var/log/nordabiznes'):
_security_handler = logging.FileHandler(_security_log_path)
_security_handler.setFormatter(logging.Formatter(
'%(asctime)s [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
))
security_logger.addHandler(_security_handler)
# Import database models
from database import (
init_db,
SessionLocal,
User,
Company,
Category,
Service,
Competency,
CompanyDigitalMaturity,
CompanyWebsiteAnalysis,
CompanyQualityTracking,
CompanyWebsiteContent,
CompanyAIInsights,
CompanyEvent,
CompanySocialMedia,
CompanyContact,
AIChatConversation,
AIChatMessage,
AIChatFeedback,
AIAPICostLog,
ForumTopic,
ForumReply,
ForumAttachment,
NordaEvent,
EventAttendee,
PrivateMessage,
Classified,
UserNotification,
CompanyRecommendation,
MembershipFee,
MembershipFeeConfig,
Person,
CompanyPerson,
GBPAudit,
ITAudit,
KRSAudit,
CompanyPKD,
CompanyFinancialReport,
UserSession,
UserBlock,
PageView,
UserClick,
AnalyticsDaily,
PopularPagesDaily,
SearchQuery,
ConversionEvent,
JSError,
PopularSearchesDaily,
HourlyActivity,
AuditLog,
SecurityAlert,
ZOPKNews,
SystemRole
)
from utils.decorators import role_required
# Import services
import gemini_service
from nordabiz_chat import NordaBizChatEngine
from search_service import search_companies
import krs_api_service
from file_upload_service import FileUploadService
# Security service for audit log, alerting, GeoIP, 2FA
try:
from security_service import (
log_audit, create_security_alert, get_client_ip,
is_ip_allowed, geoip_check, init_security_service,
generate_totp_secret, get_totp_uri, verify_totp,
generate_backup_codes, verify_backup_code, requires_2fa
)
SECURITY_SERVICE_AVAILABLE = True
except ImportError as e:
SECURITY_SERVICE_AVAILABLE = False
logger.warning(f"Security service not available: {e}")
# SEO audit components for triggering audits via API
import sys
_scripts_path = os.path.join(os.path.dirname(__file__), 'scripts')
if _scripts_path not in sys.path:
sys.path.insert(0, _scripts_path)
try:
from seo_audit import SEOAuditor, SEO_AUDIT_VERSION
SEO_AUDIT_AVAILABLE = True
except ImportError as e:
SEO_AUDIT_AVAILABLE = False
logger.warning(f"SEO audit service not available: {e}")
# GBP (Google Business Profile) audit service
try:
from gbp_audit_service import (
GBPAuditService,
audit_company as gbp_audit_company,
get_company_audit as gbp_get_company_audit,
fetch_google_business_data as gbp_fetch_google_data
)
GBP_AUDIT_AVAILABLE = True
GBP_AUDIT_VERSION = '1.0'
except ImportError as e:
GBP_AUDIT_AVAILABLE = False
GBP_AUDIT_VERSION = None
logger.warning(f"GBP audit service not available: {e}")
# KRS (Krajowy Rejestr Sądowy) audit service
try:
from krs_audit_service import parse_krs_pdf, parse_krs_pdf_full
KRS_AUDIT_AVAILABLE = True
KRS_AUDIT_VERSION = '1.0'
except ImportError as e:
KRS_AUDIT_AVAILABLE = False
KRS_AUDIT_VERSION = None
logger.warning(f"KRS audit service not available: {e}")
# Initialize Flask app
app = Flask(__name__)
# Fix URL scheme behind reverse proxy (NPM → Gunicorn)
from werkzeug.middleware.proxy_fix import ProxyFix
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1)
app.config['PREFERRED_URL_SCHEME'] = 'https'
# Security: Require strong SECRET_KEY (no default value allowed)
SECRET_KEY = os.getenv('SECRET_KEY')
if not SECRET_KEY or len(SECRET_KEY) < 32:
raise ValueError("SECRET_KEY must be set in environment variables and be at least 32 characters long")
app.config['SECRET_KEY'] = SECRET_KEY
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(days=7)
# Security configurations
app.config['WTF_CSRF_ENABLED'] = True
app.config['WTF_CSRF_TIME_LIMIT'] = None # No time limit for CSRF tokens
app.config['SESSION_COOKIE_SECURE'] = os.getenv('FLASK_ENV') != 'development' # HTTPS only in production
app.config['SESSION_COOKIE_HTTPONLY'] = True
app.config['SESSION_COOKIE_SAMESITE'] = 'Lax'
# Template filters
from zoneinfo import ZoneInfo
_WARSAW_TZ = ZoneInfo('Europe/Warsaw')
_UTC_TZ = ZoneInfo('UTC')
@app.template_filter('local_time')
def local_time_filter(dt, fmt='%d.%m.%Y %H:%M'):
"""Convert naive UTC datetime to Europe/Warsaw and format."""
if not dt:
return ''
# date objects (not datetime) — format directly, no timezone conversion
if not hasattr(dt, 'hour'):
return dt.strftime(fmt)
# time objects — format directly, no timezone conversion
if not hasattr(dt, 'year'):
return dt.strftime(fmt)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=_UTC_TZ)
return dt.astimezone(_WARSAW_TZ).strftime(fmt)
@app.template_filter('ensure_url')
def ensure_url_filter(url):
"""Ensure URL has http:// or https:// scheme"""
if url and not url.startswith(('http://', 'https://')):
return f'https://{url}'
return url
# Register linkify filter for messages
from utils.helpers import linkify_urls
app.jinja_env.filters['linkify'] = linkify_urls
# Register forum markdown filter
from utils.markdown import register_markdown_filter
register_markdown_filter(app)
# Register founding history formatter
from utils.history_formatter import register_history_filter
register_history_filter(app)
# Initialize extensions from centralized extensions.py
from extensions import csrf, limiter, login_manager
csrf.init_app(app)
# Initialize rate limiter with Redis storage (persistent across restarts)
# Falls back to memory if Redis unavailable
_redis_available = False
try:
import redis
_redis_client = redis.Redis(host='localhost', port=6379, db=0)
_redis_client.ping()
_redis_available = True
logger.info("Rate limiter using Redis storage")
except Exception:
logger.warning("Redis unavailable, rate limiter using memory storage")
# Note: default_limits are set in extensions.py
# Here we only configure storage
if _redis_available:
limiter._storage_uri = "redis://localhost:6379/0"
else:
limiter._storage_uri = "memory://"
limiter.init_app(app)
from redis_service import init_redis
init_redis(app)
@limiter.request_filter
def is_admin_exempt():
"""Exempt logged-in admins from rate limiting."""
from flask_login import current_user
try:
return current_user.is_authenticated and current_user.has_role(SystemRole.ADMIN)
except Exception:
return False
# Initialize database
init_db()
# Initialize Login Manager (imported from extensions.py)
login_manager.init_app(app)
login_manager.login_view = 'login' # Will change to 'auth.login' after full migration
login_manager.login_message = 'Zaloguj się, aby uzyskać dostęp do tej strony.'
# Initialize Gemini service
try:
gemini_service.init_gemini_service(model='3-flash') # Paid tier: 10K RPD, thinking mode, fallback: 2.5-flash-lite (Unlimited) → 2.5-flash (10K)
logger.info("Gemini service initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize Gemini service: {e}")
# Register blueprints (Phase 1: reports, community)
from blueprints import register_blueprints
register_blueprints(app)
logger.info("Blueprints registered")
@login_manager.user_loader
def load_user(user_id):
"""Load user from database with eager-loaded relationships"""
from sqlalchemy.orm import joinedload
db = SessionLocal()
try:
user = db.query(User).options(
joinedload(User.company_associations)
).filter_by(id=int(user_id)).first()
if user:
# Force-load associations before detaching from session
_ = user.company_associations
return user
finally:
db.close()
# ============================================================
# TEMPLATE CONTEXT PROCESSORS
# ============================================================
@app.context_processor
def inject_globals():
"""Inject global variables into all templates"""
is_staging = os.getenv('STAGING') == 'true'
return {
'current_year': datetime.now().year,
'now': datetime.now(), # Must be value, not method - templates use now.strftime()
'COMPANY_COUNT': COMPANY_COUNT_MARKETING, # Liczba podmiotów (cel marketingowy)
'is_staging': is_staging,
'staging_features': STAGING_TEST_FEATURES if is_staging else {},
'SystemRole': SystemRole,
}
@app.context_processor
def inject_audit_access():
from utils.decorators import is_audit_owner
return dict(is_audit_owner=is_audit_owner())
@app.context_processor
def inject_push_visibility():
"""Udostępnij szablonom informację, czy dzwonek Web Push ma być widoczny
dla bieżącego użytkownika. Reguła: jeśli PUSH_USER_WHITELIST jest niepusty,
to tylko wymienieni user_id widzą dzwonek. Pusty = wszyscy zalogowani.
"""
if not current_user.is_authenticated:
return {'push_bell_visible': False}
raw = os.getenv('PUSH_USER_WHITELIST', '').strip()
if not raw:
return {'push_bell_visible': True}
try:
whitelist = {int(x) for x in raw.split(',') if x.strip().isdigit()}
return {'push_bell_visible': current_user.id in whitelist}
except Exception:
return {'push_bell_visible': False}
@app.context_processor
def inject_company_context():
"""Inject multi-company context into all templates."""
if not current_user.is_authenticated or not current_user.company_id:
return {}
from database import UserCompany
from helpers.company_context import get_active_company_id
db = SessionLocal()
try:
user_companies = db.query(UserCompany).filter_by(
user_id=current_user.id
).order_by(UserCompany.is_primary.desc(), UserCompany.created_at.asc()).all()
# Eager-load company objects while session is open
for uc in user_companies:
_ = uc.company.name if uc.company else None
active_cid = get_active_company_id()
# Validate active_company_id is still valid for this user
valid_ids = {uc.company_id for uc in user_companies}
if active_cid not in valid_ids:
active_cid = current_user.company_id
session.pop('active_company_id', None)
active_company = None
for uc in user_companies:
if uc.company_id == active_cid:
active_company = uc.company
break
return {
'user_companies': user_companies,
'active_company_id': active_cid,
'active_company': active_company,
'has_multiple_companies': len(user_companies) > 1,
}
except Exception as e:
logger.error(f"inject_company_context error for user {current_user.id}: {e}")
return {}
finally:
db.close()
@app.context_processor
def inject_notifications():
"""Inject unread notifications count into all templates"""
if current_user.is_authenticated:
db = SessionLocal()
try:
unread_count = db.query(UserNotification).filter(
UserNotification.user_id == current_user.id,
UserNotification.is_read == False
).count()
return {'unread_notifications_count': unread_count}
finally:
db.close()
return {'unread_notifications_count': 0}
# ============================================================
# NOTIFICATION HELPERS
# ============================================================
def create_notification(user_id, title, message, notification_type='info',
related_type=None, related_id=None, action_url=None):
"""
Create a notification for a user.
Args:
user_id: ID of the user to notify
title: Notification title
message: Notification message/body
notification_type: Type of notification (news, system, message, event, alert)
related_type: Type of related entity (company_news, event, message, etc.)
related_id: ID of the related entity
action_url: URL to navigate when notification is clicked
Returns:
UserNotification object or None on error
"""
db = SessionLocal()
try:
notification = UserNotification(
user_id=user_id,
title=title,
message=message,
notification_type=notification_type,
related_type=related_type,
related_id=related_id,
action_url=action_url
)
db.add(notification)
db.commit()
db.refresh(notification)
logger.info(f"Created notification for user {user_id}: {title}")
return notification
except Exception as e:
logger.error(f"Error creating notification: {e}")
db.rollback()
return None
finally:
db.close()
def create_news_notification(company_id, news_id, news_title):
"""
Create notification for company owner when their news is approved.
Args:
company_id: ID of the company
news_id: ID of the approved news
news_title: Title of the news
"""
db = SessionLocal()
try:
# Find users associated with this company
users = db.query(User).filter(
User.company_id == company_id,
User.is_active == True
).all()
for user in users:
create_notification(
user_id=user.id,
title="Nowa aktualnosc o Twojej firmie",
message=f"Aktualnosc '{news_title}' zostala zatwierdzona i jest widoczna na profilu firmy.",
notification_type='news',
related_type='company_news',
related_id=news_id,
action_url=f"/company/{company_id}"
)
finally:
db.close()
# ============================================================
# USER ANALYTICS - TRACKING HELPERS
# ============================================================
# Global variable to store current page_view_id for templates
_current_page_view_id = {}
def get_or_create_analytics_session():
"""
Get existing analytics session or create new one.
Returns the database session ID (integer).
Includes GeoIP lookup and UTM parameter parsing.
"""
analytics_session_id = session.get('analytics_session_id')
if not analytics_session_id:
analytics_session_id = str(uuid.uuid4())
session['analytics_session_id'] = analytics_session_id
db = SessionLocal()
try:
user_session = db.query(UserSession).filter_by(session_id=analytics_session_id).first()
if not user_session:
# Parse user agent
ua_string = request.headers.get('User-Agent', '')
try:
ua = parse_user_agent(ua_string)
device_type = 'mobile' if ua.is_mobile else ('tablet' if ua.is_tablet else 'desktop')
browser = ua.browser.family
browser_version = ua.browser.version_string
os_name = ua.os.family
os_version = ua.os.version_string
ua_lower = ua_string.lower()
is_bot = ua.is_bot or any(p in ua_lower for p in [
'curl/', 'python-requests', 'axios/', 'wget/', 'scrapy',
'werkzeug', 'leakix', 'nuclei', 'masscan', 'zgrab', 'httpx',
'googleassociationservice', 'censysinspect', 'paloaltonetworks',
'cortex', 'netcraft', 'fasthttp', 'cms-checker',
'wp-safe-scanner', 'notebooklm', 'ruby/', 'skypeuri',
'com.apple.webkit', 'networkingextension',
'oai-searchbot', 'gptbot', 'chatgpt-user',
])
if not ua_string.strip() or ua_string.strip() == 'Mozilla/5.0':
is_bot = True
except Exception:
device_type = 'desktop'
browser = 'Unknown'
browser_version = ''
os_name = 'Unknown'
os_version = ''
is_bot = False
# GeoIP lookup
country, city, region = None, None, None
ip_address = request.headers.get('X-Forwarded-For', request.remote_addr)
if ip_address:
ip_address = ip_address.split(',')[0].strip()
try:
from security_service import get_geoip_info
geo_info = get_geoip_info(ip_address)
if geo_info:
country = geo_info.get('country')
city = geo_info.get('city')
region = geo_info.get('region')
except Exception as e:
logger.debug(f"GeoIP lookup failed for {ip_address}: {e}")
# UTM parameters (z pierwszego requestu sesji)
utm_source = request.args.get('utm_source', '')[:255] or None
utm_medium = request.args.get('utm_medium', '')[:255] or None
utm_campaign = request.args.get('utm_campaign', '')[:255] or None
utm_term = request.args.get('utm_term', '')[:255] or None
utm_content = request.args.get('utm_content', '')[:255] or None
user_session = UserSession(
session_id=analytics_session_id,
user_id=current_user.id if current_user.is_authenticated else None,
ip_address=ip_address,
user_agent=ua_string[:2000] if ua_string else None,
device_type=device_type,
browser=browser[:50] if browser else None,
browser_version=browser_version[:20] if browser_version else None,
os=os_name[:50] if os_name else None,
os_version=os_version[:20] if os_version else None,
# GeoIP
country=country,
city=city,
region=region,
# UTM
utm_source=utm_source,
utm_medium=utm_medium,
utm_campaign=utm_campaign,
utm_term=utm_term,
utm_content=utm_content,
is_bot=is_bot,
)
# PWA detection from cookie (set by JS in standalone mode)
if request.cookies.get('pwa_mode') == '1':
user_session.is_pwa = True
db.add(user_session)
db.commit()
db.refresh(user_session)
else:
# Update last activity AND duration
user_session.last_activity_at = datetime.now()
user_session.duration_seconds = int(
(datetime.now() - user_session.started_at).total_seconds()
)
if current_user.is_authenticated and not user_session.user_id:
user_session.user_id = current_user.id
# PWA cookie arrives on 2nd request (after JS sets it)
if not user_session.is_pwa and request.cookies.get('pwa_mode') == '1':
user_session.is_pwa = True
db.commit()
return user_session.id
except Exception as e:
logger.error(f"Analytics session error: {e}")
db.rollback()
return None
finally:
db.close()
def track_conversion(event_type: str, company_id: int = None, target_type: str = None,
target_value: str = None, metadata: dict = None):
"""
Track conversion event.
Args:
event_type: Type of conversion (register, login, contact_click, rsvp, message, classified)
company_id: Related company ID (for contact_click)
target_type: What was clicked (email, phone, website)
target_value: The value (email address, phone number, etc.)
metadata: Additional data as dict
"""
try:
analytics_session_id = session.get('analytics_session_id')
session_db_id = None
db = SessionLocal()
try:
if analytics_session_id:
user_session = db.query(UserSession).filter_by(session_id=analytics_session_id).first()
if user_session:
session_db_id = user_session.id
# Określ kategorię konwersji
category_map = {
'register': 'acquisition',
'login': 'activation',
'contact_click': 'engagement',
'rsvp': 'engagement',
'message': 'engagement',
'classified': 'engagement'
}
conversion = ConversionEvent(
session_id=session_db_id,
user_id=current_user.id if current_user.is_authenticated else None,
event_type=event_type,
event_category=category_map.get(event_type, 'other'),
company_id=company_id,
target_type=target_type,
target_value=target_value[:500] if target_value else None,
source_page=request.url[:500] if request.url else None,
referrer=request.referrer[:500] if request.referrer else None,
event_metadata=metadata
)
db.add(conversion)
db.commit()
logger.info(f"Conversion tracked: {event_type} company={company_id} target={target_type}")
except Exception as e:
logger.error(f"Conversion tracking error: {e}")
db.rollback()
finally:
db.close()
except Exception as e:
logger.error(f"Conversion tracking outer error: {e}")
@app.before_request
def check_geoip():
"""Block requests from high-risk countries (RU, CN, KP, IR, BY, SY, VE, CU)."""
# Skip static files and health checks
if request.path.startswith('/static') or request.path == '/health':
return
if not is_ip_allowed():
ip = request.headers.get('X-Forwarded-For', request.remote_addr)
if ip:
ip = ip.split(',')[0].strip()
from security_service import get_country_code
country = get_country_code(ip)
logger.warning(f"GEOIP_BLOCKED ip={ip} country={country} path={request.path}")
# Create alert for blocked access
try:
db = SessionLocal()
from security_service import create_security_alert
create_security_alert(
db, 'geo_blocked', 'low',
ip_address=ip,
details={'country': country, 'path': request.path, 'user_agent': request.user_agent.string[:200]}
)
db.commit()
db.close()
except Exception as e:
logger.error(f"Failed to create geo block alert: {e}")
abort(403)
@app.before_request
def update_last_active():
"""Update last_active_at and page_views_count for engagement tracking."""
if not current_user.is_authenticated:
return
if request.path.startswith('/static') or request.path == '/health' or request.path == '/favicon.ico':
return
if request.path.startswith('/api'):
return
from flask import session as flask_session
import time
now = time.time()
last_update = flask_session.get('_last_active_update', 0)
if now - last_update < 60: # 1 minute throttle
return
flask_session['_last_active_update'] = now
# Count page views in session, flush to DB periodically
pv = flask_session.get('_pv_buffer', 0)
flask_session['_pv_buffer'] = 0
try:
db = SessionLocal()
user = db.query(User).filter_by(id=current_user.id).first()
if user:
user.last_active_at = datetime.now()
user.page_views_count = (user.page_views_count or 0) + pv
db.commit()
db.close()
except Exception:
pass
@app.before_request
def track_page_view():
"""Track page views (excluding static files and API calls)"""
# Skip static files
if request.path.startswith('/static'):
return
# Skip API calls except selected ones
if request.path.startswith('/api'):
return
# Skip analytics tracking endpoints
if request.path in ['/api/analytics/track', '/api/analytics/heartbeat']:
return
# Skip health checks
if request.path == '/health':
return
# Skip favicon
if request.path == '/favicon.ico':
return
# Buffer page views for authenticated users (flushed in update_last_active)
if current_user.is_authenticated:
from flask import session as flask_session
flask_session['_pv_buffer'] = flask_session.get('_pv_buffer', 0) + 1
try:
session_db_id = get_or_create_analytics_session()
if not session_db_id:
return
db = SessionLocal()
try:
page_view = PageView(
session_id=session_db_id,
user_id=current_user.id if current_user.is_authenticated else None,
url=request.url[:2000] if request.url else '',
path=request.path[:500] if request.path else '/',
referrer=request.referrer[:2000] if request.referrer else None
)
# Extract company_id from path if on company page
if request.path.startswith('/company/'):
try:
slug = request.path.split('/')[2].split('?')[0]
company = db.query(Company).filter_by(slug=slug).first()
if company:
page_view.company_id = company.id
except Exception:
pass
db.add(page_view)
# Update session page count
user_session = db.query(UserSession).filter_by(id=session_db_id).first()
if user_session:
user_session.page_views_count = (user_session.page_views_count or 0) + 1
db.commit()
# Store page_view_id for click tracking (in request context)
_current_page_view_id[id(request)] = page_view.id
except Exception as e:
logger.error(f"Page view tracking error: {e}")
db.rollback()
finally:
db.close()
except Exception as e:
logger.error(f"Page view tracking outer error: {e}")
@app.context_processor
def inject_page_view_id():
"""Inject page_view_id into all templates for JS tracking"""
page_view_id = _current_page_view_id.get(id(request), '')
return {'page_view_id': page_view_id}
@app.teardown_request
def cleanup_page_view_id(exception=None):
"""Clean up page_view_id from global dict after request"""
_current_page_view_id.pop(id(request), None)
# ============================================================
# SECURITY MIDDLEWARE & HELPERS
# ============================================================
@app.after_request
def set_security_headers(response):
"""Add security headers to all responses"""
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['X-Frame-Options'] = 'SAMEORIGIN'
response.headers['X-XSS-Protection'] = '1; mode=block'
response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains'
response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin'
response.headers['Permissions-Policy'] = 'camera=(), microphone=(), geolocation=(self)'
# Note: static file caching is handled by Nginx (30d), not Flask
# Freshness signal for SEO crawlers
if response.content_type and 'text/html' in response.content_type and 'Last-Modified' not in response.headers:
from email.utils import formatdate
from time import time
response.headers['Last-Modified'] = formatdate(timeval=time(), localtime=False, usegmt=True)
# Content Security Policy
csp = (
"default-src 'self'; "
"script-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net; "
"style-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net; "
"img-src 'self' data: https:; "
"font-src 'self' https://cdn.jsdelivr.net; "
"frame-src https://www.google.com/maps/; "
"connect-src 'self'"
)
response.headers['Content-Security-Policy'] = csp
return response
def validate_email(email):
"""Validate email format"""
if not email or len(email) > 255:
return False
# RFC 5322 compliant email regex (simplified)
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return re.match(pattern, email) is not None
def validate_password(password):
"""
Validate password strength
Requirements:
- Minimum 8 characters
- At least one uppercase letter
- At least one lowercase letter
- At least one digit
"""
if not password or len(password) < 8:
return False, "Hasło musi mieć minimum 8 znaków"
if not re.search(r'[A-Z]', password):
return False, "Hasło musi zawierać przynajmniej jedną wielką literę"
if not re.search(r'[a-z]', password):
return False, "Hasło musi zawierać przynajmniej jedną małą literę"
if not re.search(r'\d', password):
return False, "Hasło musi zawierać przynajmniej jedną cyfrę"
return True, "OK"
def sanitize_input(text, max_length=1000):
"""Sanitize user input - remove potentially dangerous characters"""
if not text:
return ""
# Remove null bytes
text = text.replace('\x00', '')
# Trim to max length
text = text[:max_length]
# Strip whitespace
text = text.strip()
return text
def get_free_tier_usage():
"""
Get today's Gemini API usage for free tier tracking.
Returns:
Dict with requests_today and tokens_today
"""
from datetime import date
from sqlalchemy import func
db = SessionLocal()
try:
today = date.today()
result = db.query(
func.count(AIAPICostLog.id).label('requests'),
func.coalesce(func.sum(AIAPICostLog.total_tokens), 0).label('tokens')
).filter(
func.date(AIAPICostLog.timestamp) == today,
AIAPICostLog.api_provider == 'gemini'
).first()
return {
'requests_today': result.requests or 0,
'tokens_today': int(result.tokens or 0)
}
except Exception as e:
logger.warning(f"Failed to get free tier usage: {e}")
return {'requests_today': 0, 'tokens_today': 0}
finally:
db.close()
def get_brave_api_usage():
"""
Get Brave Search API usage for current month.
Brave free tier: 2000 requests/month
Returns:
Dict with usage stats and limits
"""
from datetime import date
from sqlalchemy import func, extract
db = SessionLocal()
try:
today = date.today()
current_month = today.month
current_year = today.year
# Monthly usage
monthly_result = db.query(
func.count(AIAPICostLog.id).label('requests')
).filter(
extract('month', AIAPICostLog.timestamp) == current_month,
extract('year', AIAPICostLog.timestamp) == current_year,
AIAPICostLog.api_provider == 'brave'
).first()
# Today's usage
daily_result = db.query(
func.count(AIAPICostLog.id).label('requests')
).filter(
func.date(AIAPICostLog.timestamp) == today,
AIAPICostLog.api_provider == 'brave'
).first()
monthly_used = monthly_result.requests or 0
daily_used = daily_result.requests or 0
monthly_limit = 2000 # Brave free tier
return {
'requests_today': daily_used,
'requests_this_month': monthly_used,
'monthly_limit': monthly_limit,
'remaining': max(0, monthly_limit - monthly_used),
'usage_percent': round((monthly_used / monthly_limit) * 100, 1) if monthly_limit > 0 else 0,
'tier': 'free',
'is_limit_reached': monthly_used >= monthly_limit
}
except Exception as e:
logger.warning(f"Failed to get Brave API usage: {e}")
return {
'requests_today': 0,
'requests_this_month': 0,
'monthly_limit': 2000,
'remaining': 2000,
'usage_percent': 0,
'tier': 'free',
'is_limit_reached': False
}
finally:
db.close()
def log_brave_api_call(user_id=None, feature='news_search', company_name=None):
"""
Log a Brave API call for usage tracking.
Args:
user_id: User who triggered the call (optional)
feature: Feature name (news_search, etc.)
company_name: Company being searched (for reference)
"""
db = SessionLocal()
try:
log_entry = AIAPICostLog(
api_provider='brave',
model_name='search_api',
feature=feature,
user_id=user_id,
input_tokens=0,
output_tokens=0,
total_tokens=0
)
db.add(log_entry)
db.commit()
logger.debug(f"Logged Brave API call: {feature} for {company_name}")
except Exception as e:
logger.error(f"Failed to log Brave API call: {e}")
db.rollback()
finally:
db.close()
# ============================================================
# HEALTH CHECK
# ============================================================
@app.route('/health')
def health():
"""Health check endpoint for monitoring"""
return {'status': 'ok'}, 200
@app.route('/test-error-500')
@login_required
def test_error_500():
"""Test endpoint to trigger 500 error for notification testing. Admin only."""
if not current_user.can_access_admin_panel():
flash('Brak uprawnień', 'error')
return redirect(url_for('index'))
# Intentionally raise an error to test error notification
raise Exception("TEST ERROR 500 - Celowy błąd testowy do sprawdzenia powiadomień email")
@app.route('/health/full')
@login_required
@role_required(SystemRole.ADMIN)
def health_full():
"""
Extended health check - verifies all critical endpoints.
Returns detailed status of each endpoint.
Access: /health/full
"""
results = []
all_ok = True
# List of ALL endpoints to check (path, name)
# Comprehensive list updated 2026-01-17
endpoints = [
# ========== PUBLIC PAGES ==========
('/', 'Strona główna'),
('/login', 'Logowanie'),
('/register', 'Rejestracja'),
('/release-notes', 'Historia zmian'),
('/search?q=test', 'Wyszukiwarka'),
('/aktualnosci', 'Aktualności'),
('/forum', 'Forum'),
('/kalendarz', 'Kalendarz wydarzeń'),
('/tablica', 'Tablica ogłoszeń'),
('/nowi-czlonkowie', 'Nowi członkowie'),
('/mapa-polaczen', 'Mapa połączeń'),
('/forgot-password', 'Reset hasła'),
# ========== RAPORTY ==========
('/raporty/', 'Raporty'),
('/raporty/staz-czlonkostwa', 'Raport: Staż członkostwa'),
('/raporty/social-media', 'Raport: Social Media'),
('/raporty/struktura-branzowa', 'Raport: Struktura branżowa'),
# ========== ZOPK PUBLIC ==========
('/zopk', 'ZOPK: Strona główna'),
('/zopk/aktualnosci', 'ZOPK: Aktualności'),
# ========== CHAT ==========
('/chat', 'NordaGPT Chat'),
# ========== IT AUDIT ==========
('/it-audit/form', 'IT Audit: Formularz'),
# ========== PUBLIC API ==========
('/api/companies', 'API: Lista firm'),
('/api/model-info', 'API: Model info'),
('/api/gbp/audit/health', 'API: GBP health'),
# ========== ADMIN: CORE ==========
('/admin/security', 'Admin: Bezpieczeństwo'),
('/admin/analytics', 'Admin: Analityka'),
('/admin/status', 'Admin: Status systemu'),
('/admin/health', 'Admin: Health dashboard'),
('/admin/ai-usage', 'Admin: AI Usage'),
('/admin/chat-analytics', 'Admin: Chat analytics'),
('/admin/users', 'Admin: Użytkownicy'),
('/admin/recommendations', 'Admin: Rekomendacje'),
('/admin/fees', 'Admin: Składki'),
# ========== ADMIN: AUDITS ==========
('/admin/seo', 'Admin: SEO Audit'),
('/admin/gbp-audit', 'Admin: GBP Audit'),
('/admin/social-media', 'Admin: Social Media'),
('/admin/social-audit', 'Admin: Social Audit'),
('/admin/it-audit', 'Admin: IT Audit'),
('/admin/digital-maturity', 'Admin: Digital Maturity'),
('/admin/krs-audit', 'Admin: KRS Audit'),
# ========== ADMIN: COMMUNITY ==========
('/admin/forum', 'Admin: Forum'),
('/admin/kalendarz', 'Admin: Kalendarz'),
# ========== ADMIN: ZOPK ==========
('/admin/zopk', 'Admin: ZOPK Panel'),
('/admin/zopk/news', 'Admin: ZOPK News'),
('/admin/zopk/knowledge', 'Admin: ZOPK Knowledge'),
('/admin/zopk/knowledge/chunks', 'Admin: ZOPK Chunks'),
('/admin/zopk/knowledge/facts', 'Admin: ZOPK Facts'),
('/admin/zopk/knowledge/entities', 'Admin: ZOPK Entities'),
('/admin/zopk/knowledge/duplicates', 'Admin: ZOPK Duplikaty'),
('/admin/zopk/knowledge/fact-duplicates', 'Admin: ZOPK Fact Duplicates'),
('/admin/zopk/knowledge/graph', 'Admin: ZOPK Graf'),
('/admin/zopk/timeline', 'Admin: ZOPK Timeline'),
# ========== ZOPK API ==========
('/api/zopk/milestones', 'API: ZOPK Milestones'),
('/api/zopk/knowledge/dashboard-stats', 'API: ZOPK Dashboard stats'),
# ========== USER SETTINGS (v1.19.0) ==========
('/settings/privacy', 'Ustawienia: Prywatność'),
('/settings/blocks', 'Ustawienia: Blokady'),
('/settings/2fa', 'Ustawienia: 2FA'),
# ========== WIADOMOŚCI ==========
('/wiadomosci', 'Wiadomości: Odebrane'),
('/wiadomosci/wyslane', 'Wiadomości: Wysłane'),
('/wiadomosci/nowa', 'Wiadomości: Nowa'),
# ========== EDUKACJA ==========
('/edukacja', 'Edukacja: Strona główna'),
# ========== ADMIN: INSIGHTS ==========
('/admin/insights', 'Admin: Insights'),
]
# Dodaj losową firmę do sprawdzenia
db = SessionLocal()
try:
random_company = db.query(Company).first()
if random_company:
endpoints.append((f'/company/{random_company.slug}', f'Profil: {random_company.name[:25]}'))
finally:
db.close()
# Testuj każdy endpoint używając test client
with app.test_client() as client:
for path, name in endpoints:
try:
response = client.get(path, follow_redirects=False)
status = response.status_code
# 200 = OK, 302 = redirect (np. do logowania) = OK
# 429 = rate limited (endpoint działa, tylko ograniczony)
# 500 = błąd serwera, 404 = nie znaleziono
if status in (200, 302, 304, 429):
results.append({
'endpoint': path,
'name': name,
'status': status,
'ok': True
})
else:
results.append({
'endpoint': path,
'name': name,
'status': status,
'ok': False
})
all_ok = False
except Exception as e:
results.append({
'endpoint': path,
'name': name,
'status': 500,
'ok': False,
'error': str(e)[:100]
})
all_ok = False
# Podsumowanie
passed = sum(1 for r in results if r['ok'])
failed = len(results) - passed
return {
'status': 'ok' if all_ok else 'degraded',
'summary': {
'total': len(results),
'passed': passed,
'failed': failed
},
'endpoints': results,
'timestamp': datetime.now().isoformat()
}, 200 if all_ok else 503
# ============================================================
# PUBLIC ROUTES - MOVED TO blueprints/public/routes.py
# ============================================================
# The routes below have been migrated to the public blueprint.
# They are commented out but preserved for reference.
# See: blueprints/public/routes.py
# ============================================================
# RECOMMENDATIONS ADMIN ROUTES - MOVED TO: blueprints/admin/routes.py
# ============================================================
# ============================================================
# USER MANAGEMENT ADMIN ROUTES
# Moved to: blueprints/admin/routes.py
# NOTE: AI-parse routes remain below
# ============================================================
# admin_users, admin_user_add - MOVED TO: blueprints/admin/routes.py
# AI-ASSISTED USER CREATION - MOVED TO blueprints/admin/routes_users_api.py
# Routes: /admin/users-api/ai-parse, /admin/users-api/bulk-create
# ============================================================
# USER ANALYTICS API ROUTES - MOVED TO blueprints/api/routes_analytics.py
# ============================================================
# Routes: /api/analytics/track, /api/analytics/heartbeat, /api/analytics/scroll,
# /api/analytics/error, /api/analytics/performance, /api/analytics/conversion
# ============================================================
# RECOMMENDATIONS API ROUTES - MOVED TO blueprints/api/routes_recommendations.py
# ============================================================
# Routes: /api/recommendations/<company_id>, /api/recommendations/create,
# /api/recommendations/<rec_id>/edit, /api/recommendations/<rec_id>/delete
# ============================================================
# B2B CLASSIFIEDS ROUTES - MIGRATED TO blueprints/community/classifieds/
# ============================================================
# Routes: /tablica, /tablica/nowe, /tablica/<id>, /tablica/<id>/zakoncz
# ============================================================
# NEW MEMBERS ROUTE - MOVED TO blueprints/public/routes.py
# ============================================================
# AUTHENTICATION ROUTES - MOVED TO blueprints/auth/routes.py
# ============================================================
# The routes below have been migrated to the auth blueprint.
# They are commented out but preserved for reference.
# See: blueprints/auth/routes.py
# ============================================================
# TWO-FACTOR AUTHENTICATION - MOVED TO blueprints/auth/routes.py
# ============================================================
# MOJE KONTO - MOVED TO blueprints/auth/routes.py
# ============================================================
# USER DASHBOARD - MOVED TO blueprints/public/routes.py
# ============================================================
# API ROUTES - MOVED TO: blueprints/api/routes_company.py
# Routes: /api/companies, /api/connections, /api/check-email, /api/verify-nip,
# /api/verify-krs, /api/company/<id>/refresh-krs, /api/company/<id>/enrich-ai,
# /api/model-info, /api/admin/test-sanitization
# ============================================================
# ============================================================
# SEO/GBP/SOCIAL AUDIT API - MOVED TO: blueprints/api/routes_*_audit.py
# ============================================================
# ============================================================
# AUDIT DASHBOARDS - MOVED TO: blueprints/audit/routes.py
# ============================================================
# Validation and Company API routes moved to blueprints/api/routes_company.py
# ============================================================
# MODEL COMPARISON - Porównanie modeli AI
# ============================================================
# ============================================================
# SYSTEM STATUS DASHBOARD (Admin only)
# MOVED TO blueprints/admin/routes_status.py
# ============================================================
# ============================================================
# DEBUG PANEL (Admin only)
# ============================================================
# ============================================================
# SOCIAL MEDIA AUDIT ADMIN DASHBOARD
# ============================================================
# ============================================================
# IT AUDIT ADMIN DASHBOARD
# ============================================================
# ============================================================
# IT AUDIT FORM - MOVED TO blueprints/it_audit/
# ============================================================
# Routes: /it-audit/form, /it-audit/save, /api/it-audit/*
# ============================================================
# RAPORTY - MIGRATED TO blueprints/reports/
# ============================================================
# Routes: /raporty, /raporty/staz-czlonkostwa, /raporty/social-media, /raporty/struktura-branzowa
# RELEASE NOTES - MOVED TO blueprints/admin/routes.py (admin_notify_release)
# ============================================================
# ============================================================
# ZOPK PUBLIC ROUTES - MOVED TO blueprints/public/routes_zopk.py
# Routes: /zopk, /zopk/projekty/<slug>, /zopk/aktualnosci
# ============================================================
# ============================================================
# ZOPK ROUTES - MOVED TO BLUEPRINTS
# ============================================================
# All ZOPK routes have been migrated to:
# - blueprints/admin/routes_zopk_dashboard.py
# - blueprints/admin/routes_zopk_news.py
# - blueprints/admin/routes_zopk_knowledge.py
# - blueprints/admin/routes_zopk_timeline.py
# ============================================================
# Endpoint aliases for ZOPK are created in blueprints/__init__.py
# ============================================================
# KRS AUDIT (Krajowy Rejestr Sądowy)
# ============================================================
# ============================================================
# KRS API ROUTES - MOVED TO blueprints/admin/routes_krs_api.py
# ============================================================
# Routes: /admin/krs-api/audit, /admin/krs-api/audit/batch, /admin/krs-api/pdf/<company_id>
# ============================================================
# ERROR HANDLERS
# ============================================================
@app.errorhandler(404)
def not_found(error):
return render_template('errors/404.html'), 404
from flask_wtf.csrf import CSRFError
@app.errorhandler(CSRFError)
def handle_csrf_error(e):
flash('Sesja wygasła lub formularz został nieprawidłowo przesłany. Spróbuj ponownie.', 'warning')
return redirect(request.referrer or url_for('index'))
def send_registration_notification(user_info):
"""Send email notification when a new user registers"""
try:
from email_service import send_email, is_configured
if not is_configured():
logger.warning("Email service not configured - skipping registration notification")
return
notify_email = os.getenv('ERROR_NOTIFY_EMAIL', 'maciej.pienczyn@inpi.pl')
if not notify_email:
return
reg_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
is_member = "✅ TAK" if user_info.get('is_norda_member') else "❌ NIE"
company_name = user_info.get('company_name', 'Brak przypisanej firmy')
subject = f"👤 NordaBiz: Nowa rejestracja - {user_info.get('name', 'Nieznany')}"
body_text = f"""👤 NOWA REJESTRACJA NA NORDABIZNES.PL
{'='*50}
🕐 Czas: {reg_time}
👤 Imię: {user_info.get('name', 'N/A')}
📧 Email: {user_info.get('email', 'N/A')}
🏢 NIP: {user_info.get('company_nip', 'N/A')}
🏛️ Firma: {company_name}
🎫 Członek NORDA: {is_member}
{'='*50}
🔗 Panel użytkowników: https://nordabiznes.pl/admin/users
"""
body_html = f"""<!DOCTYPE html>
<html>
<head><meta charset="UTF-8"></head>
<body style="font-family: 'Inter', Arial, sans-serif; background: #f8fafc; color: #1e293b; padding: 20px;">
<div style="max-width: 600px; margin: 0 auto;">
<div style="background: linear-gradient(135deg, #10b981, #059669); color: white; padding: 20px; border-radius: 8px 8px 0 0;">
<h1 style="margin: 0; font-size: 20px;">👤 Nowa rejestracja na NordaBiznes.pl</h1>
</div>
<div style="background: white; padding: 25px; border-radius: 0 0 8px 8px; box-shadow: 0 4px 6px rgba(0,0,0,0.1);">
<table style="width: 100%; border-collapse: collapse;">
<tr><td style="color: #64748b; padding: 8px 0; border-bottom: 1px solid #e2e8f0;">🕐 Czas:</td><td style="padding: 8px 0; border-bottom: 1px solid #e2e8f0; font-weight: 500;">{reg_time}</td></tr>
<tr><td style="color: #64748b; padding: 8px 0; border-bottom: 1px solid #e2e8f0;">👤 Imię:</td><td style="padding: 8px 0; border-bottom: 1px solid #e2e8f0; font-weight: 600; color: #1e40af;">{user_info.get('name', 'N/A')}</td></tr>
<tr><td style="color: #64748b; padding: 8px 0; border-bottom: 1px solid #e2e8f0;">📧 Email:</td><td style="padding: 8px 0; border-bottom: 1px solid #e2e8f0;"><a href="mailto:{user_info.get('email', '')}" style="color: #2563eb;">{user_info.get('email', 'N/A')}</a></td></tr>
<tr><td style="color: #64748b; padding: 8px 0; border-bottom: 1px solid #e2e8f0;">🏢 NIP:</td><td style="padding: 8px 0; border-bottom: 1px solid #e2e8f0; font-family: monospace;">{user_info.get('company_nip', 'N/A')}</td></tr>
<tr><td style="color: #64748b; padding: 8px 0; border-bottom: 1px solid #e2e8f0;">🏛️ Firma:</td><td style="padding: 8px 0; border-bottom: 1px solid #e2e8f0;">{company_name}</td></tr>
<tr><td style="color: #64748b; padding: 8px 0;">🎫 Członek NORDA:</td><td style="padding: 8px 0; font-weight: 600;">{is_member}</td></tr>
</table>
<div style="margin-top: 25px; text-align: center;">
<a href="https://nordabiznes.pl/admin/users" style="display: inline-block; padding: 12px 24px; background: #2563eb; color: white; text-decoration: none; border-radius: 6px; font-weight: 500;">Otwórz panel użytkowników</a>
</div>
</div>
</div>
</body>
</html>"""
result = send_email(
to=[notify_email],
subject=subject,
body_text=body_text,
body_html=body_html,
email_type='registration_notification'
)
if result:
logger.info(f"Registration notification sent to {notify_email}")
else:
logger.error(f"Failed to send registration notification to {notify_email}")
except Exception as e:
logger.error(f"Failed to send registration notification: {e}")
def send_error_notification(error, request_info):
"""Send email notification about 500 errors via Microsoft Graph"""
try:
from email_service import send_email, is_configured
if not is_configured():
logger.warning("Email service not configured - skipping error notification")
return
error_email = os.getenv('ERROR_NOTIFY_EMAIL', 'maciej.pienczyn@inpi.pl')
if not error_email:
return
# Build error details
error_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
traceback_str = tb_module.format_exc()
subject = f"🚨 NordaBiz ERROR 500: {request_info.get('path', 'Unknown')}"
body_text = f"""⚠️ BŁĄD 500 NA NORDABIZNES.PL
{'='*50}
🕐 Czas: {error_time}
🌐 URL: {request_info.get('url', 'N/A')}
📍 Ścieżka: {request_info.get('path', 'N/A')}
📝 Metoda: {request_info.get('method', 'N/A')}
👤 Użytkownik: {request_info.get('user', 'Anonimowy')}
🖥️ IP: {request_info.get('ip', 'N/A')}
🌍 User-Agent: {request_info.get('user_agent', 'N/A')}
{'='*50}
📋 BŁĄD:
{str(error)}
{'='*50}
📜 TRACEBACK:
{traceback_str}
{'='*50}
🔧 Sprawdź logi: ssh maciejpi@57.128.200.27 "sudo journalctl -u nordabiznes --since '10 minutes ago'"
"""
body_html = f"""<!DOCTYPE html>
<html>
<head><meta charset="UTF-8"></head>
<body style="font-family: 'Courier New', monospace; background: #1e1e1e; color: #d4d4d4; padding: 20px;">
<div style="max-width: 800px; margin: 0 auto;">
<div style="background: #dc2626; color: white; padding: 15px 20px; border-radius: 8px 8px 0 0;">
<h1 style="margin: 0; font-size: 20px;">🚨 BŁĄD 500 NA NORDABIZNES.PL</h1>
</div>
<div style="background: #2d2d2d; padding: 20px; border-radius: 0 0 8px 8px;">
<table style="width: 100%; border-collapse: collapse;">
<tr><td style="color: #9ca3af; padding: 5px 0;">🕐 Czas:</td><td style="color: #fbbf24;">{error_time}</td></tr>
<tr><td style="color: #9ca3af; padding: 5px 0;">🌐 URL:</td><td style="color: #60a5fa; word-break: break-all;">{request_info.get('url', 'N/A')}</td></tr>
<tr><td style="color: #9ca3af; padding: 5px 0;">📍 Ścieżka:</td><td style="color: #34d399;">{request_info.get('path', 'N/A')}</td></tr>
<tr><td style="color: #9ca3af; padding: 5px 0;">📝 Metoda:</td><td>{request_info.get('method', 'N/A')}</td></tr>
<tr><td style="color: #9ca3af; padding: 5px 0;">👤 Użytkownik:</td><td>{request_info.get('user', 'Anonimowy')}</td></tr>
<tr><td style="color: #9ca3af; padding: 5px 0;">🖥️ IP:</td><td>{request_info.get('ip', 'N/A')}</td></tr>
</table>
<div style="margin-top: 20px; padding: 15px; background: #1e1e1e; border-radius: 8px; border-left: 4px solid #dc2626;">
<div style="color: #f87171; font-weight: bold; margin-bottom: 10px;">📋 BŁĄD:</div>
<pre style="margin: 0; white-space: pre-wrap; color: #fca5a5;">{str(error)}</pre>
</div>
<div style="margin-top: 20px; padding: 15px; background: #1e1e1e; border-radius: 8px; border-left: 4px solid #f59e0b;">
<div style="color: #fbbf24; font-weight: bold; margin-bottom: 10px;">📜 TRACEBACK:</div>
<pre style="margin: 0; white-space: pre-wrap; font-size: 12px; color: #9ca3af; max-height: 400px; overflow: auto;">{traceback_str}</pre>
</div>
<div style="margin-top: 20px; padding: 15px; background: #1e3a5f; border-radius: 8px;">
<div style="color: #60a5fa;">🔧 <strong>Sprawdź logi:</strong></div>
<code style="display: block; margin-top: 10px; color: #34d399; word-break: break-all;">ssh maciejpi@57.128.200.27 "sudo journalctl -u nordabiznes --since '10 minutes ago'"</code>
</div>
</div>
</div>
</body>
</html>"""
result = send_email(
to=[error_email],
subject=subject,
body_text=body_text,
body_html=body_html,
email_type='error_notification'
)
if result:
logger.info(f"Error notification sent to {error_email}")
else:
logger.error(f"Failed to send error notification to {error_email}")
except Exception as e:
logger.error(f"Failed to send error notification: {e}")
@app.errorhandler(500)
def internal_error(error):
# Collect request info for notification
request_info = {
'url': request.url if request else 'N/A',
'path': request.path if request else 'N/A',
'method': request.method if request else 'N/A',
'ip': request.remote_addr if request else 'N/A',
'user_agent': request.headers.get('User-Agent', 'N/A') if request else 'N/A',
'user': current_user.email if current_user and current_user.is_authenticated else 'Anonimowy'
}
# Send notification in background (don't block response)
try:
send_error_notification(error, request_info)
except Exception as e:
logger.error(f"Error notification failed: {e}")
return render_template('errors/500.html'), 500
# ============================================================
# ADMIN - SECURITY DASHBOARD
# ============================================================
# ============================================================
# ANNOUNCEMENTS (Ogłoszenia dla członków)
# ============================================================
def generate_slug(title):
"""
Generate URL-friendly slug from title.
Uses unidecode for proper Polish character handling.
"""
import re
try:
from unidecode import unidecode
text = unidecode(title.lower())
except ImportError:
# Fallback without unidecode
text = title.lower()
replacements = {
'ą': 'a', 'ć': 'c', 'ę': 'e', 'ł': 'l', 'ń': 'n',
'ó': 'o', 'ś': 's', 'ź': 'z', 'ż': 'z'
}
for pl, en in replacements.items():
text = text.replace(pl, en)
# Remove special characters, replace spaces with hyphens
text = re.sub(r'[^\w\s-]', '', text)
text = re.sub(r'[-\s]+', '-', text).strip('-')
return text[:200] # Limit slug length
# ============================================================
# PUBLIC ANNOUNCEMENTS - MOVED TO blueprints/public/routes_announcements.py
# ============================================================
# Routes: /ogloszenia, /ogloszenia/<slug>
# ============================================================
# EXTERNAL CONTACTS - PAGE ROUTES MIGRATED TO blueprints/community/contacts/
# ============================================================
# Routes: /kontakty, /kontakty/<id>, /kontakty/dodaj, /kontakty/<id>/edytuj, /kontakty/<id>/usun
# API routes remain below for backwards compatibility
# ============================================================
# CONTACTS API ROUTES - MOVED TO blueprints/api/routes_contacts.py
# ============================================================
# Routes: /api/contacts/ai-parse, /api/contacts/bulk-create
# Includes AI prompts for contact parsing
# ============================================================
# HONEYPOT ENDPOINTS (trap for malicious bots)
# ============================================================
@app.route('/wp-admin')
@app.route('/wp-admin/<path:path>')
@app.route('/wp-login.php')
@app.route('/administrator')
@app.route('/phpmyadmin')
@app.route('/phpmyadmin/<path:path>')
@app.route('/.env')
@app.route('/.git/config')
@app.route('/xmlrpc.php')
@app.route('/config.php')
@app.route('/admin.php')
def honeypot_trap(path=None):
"""
Honeypot endpoints - log and return 404.
These URLs are commonly probed by malicious bots looking for WordPress,
phpMyAdmin, or exposed configuration files.
"""
client_ip = request.headers.get('X-Forwarded-For', request.remote_addr)
if client_ip and ',' in client_ip:
client_ip = client_ip.split(',')[0].strip()
security_logger.warning(f"HONEYPOT ip={client_ip} path={request.path} ua={request.user_agent.string[:100]}")
# Return 404 to not reveal this is a trap
return render_template('errors/404.html'), 404
# ============================================================
# MAIN
# ============================================================
if __name__ == '__main__':
# Port 5001 jako domyślny - macOS AirPlay zajmuje 5000
port = int(os.getenv('PORT', 5001))
debug = os.getenv('FLASK_ENV') == 'development'
logger.info(f"Starting Norda Biznes Partner on port {port}")
app.run(host='0.0.0.0', port=port, debug=debug)