""" Auth Routes ============ Authentication routes: login, logout, register, password reset, email verification, 2FA. """ import os import re import logging import secrets from datetime import datetime, timedelta from flask import render_template, request, redirect, url_for, flash, session, current_app from flask_login import login_required, login_user, logout_user, current_user from werkzeug.security import generate_password_hash, check_password_hash from . import bp from database import SessionLocal, User, Company, UserBlock, UserCompany, SystemRole from utils.helpers import sanitize_input, validate_email, validate_password from extensions import limiter from security_service import log_audit # Logger logger = logging.getLogger(__name__) # Security logger (same name as in app.py for fail2ban integration) security_logger = logging.getLogger('security') # Security service availability check try: from security_service import ( generate_totp_secret, get_totp_uri, verify_totp, generate_backup_codes, verify_backup_code, log_audit ) SECURITY_SERVICE_AVAILABLE = True except ImportError: SECURITY_SERVICE_AVAILABLE = False logger.warning("Security service not available in auth blueprint") def _auto_link_person(db, user): """Auto-link User to Person record by name match (if not already linked).""" if user.person_id or not user.name: return try: from database import Person name_parts = user.name.strip().split() if len(name_parts) < 2: return # Try exact match: first name + last name (case-insensitive) first_name = name_parts[0].upper() last_name = name_parts[-1].upper() from sqlalchemy import func person = db.query(Person).filter( func.upper(Person.nazwisko) == last_name, func.upper(Person.imiona).like(f'{first_name}%'), ).first() if person: user.person_id = person.id logger.info(f"Auto-linked user {user.id} ({user.name}) to person {person.id}") except Exception as e: logger.error(f"Auto-link person failed for user {user.id}: {e}") def _send_registration_notification(user_info): """Send email notification when a new user registers""" try: from email_service import send_email, is_configured if not is_configured(): logger.warning("Email service not configured - skipping registration notification") return notify_email = os.getenv('ERROR_NOTIFY_EMAIL', 'maciej.pienczyn@inpi.pl') if not notify_email: return reg_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') is_member = "TAK" if user_info.get('is_norda_member') else "NIE" company_name = user_info.get('company_name', 'Brak przypisanej firmy') subject = f"NordaBiz: Nowa rejestracja - {user_info.get('name', 'Nieznany')}" body_text = f"""NOWA REJESTRACJA NA NORDABIZNES.PL {'='*50} Czas: {reg_time} Imie: {user_info.get('name', 'N/A')} Email: {user_info.get('email', 'N/A')} NIP: {user_info.get('company_nip', 'N/A')} Firma: {company_name} Czlonek NORDA: {is_member} {'='*50} """ send_email( to=[notify_email], subject=subject, body_text=body_text, email_type='registration_notification' ) logger.info(f"Registration notification sent for {user_info.get('email')}") except Exception as e: logger.error(f"Failed to send registration notification: {e}") # ============================================================ # REGISTRATION # ============================================================ @bp.route('/register', methods=['GET', 'POST']) @limiter.limit("50 per hour;200 per day") def register(): """User registration""" if current_user.is_authenticated: return redirect(url_for('index')) if request.method == 'POST': email = sanitize_input(request.form.get('email', ''), 255) password = request.form.get('password', '') name = sanitize_input(request.form.get('name', ''), 255) company_nip = sanitize_input(request.form.get('company_nip', ''), 10) # Validate email if not validate_email(email): flash('Nieprawidlowy format adresu email.', 'error') return render_template('auth/register.html') # Validate password password_valid, password_message = validate_password(password) if not password_valid: flash(password_message, 'error') return render_template('auth/register.html') # Validate required fields if not name or not email or not company_nip: flash('Imię, email i NIP firmy są wymagane.', 'error') return render_template('auth/register.html') # Validate full name (first + last name) if len(name.split()) < 2: flash('Podaj imię i nazwisko (np. Jan Kowalski).', 'error') return render_template('auth/register.html') # Validate NIP format and checksum if not re.match(r'^\d{10}$', company_nip): flash('NIP musi skladac sie z 10 cyfr.', 'error') return render_template('auth/register.html') nip_weights = [6, 5, 7, 2, 3, 4, 5, 6, 7] nip_checksum = sum(int(company_nip[i]) * nip_weights[i] for i in range(9)) % 11 if nip_checksum != int(company_nip[9]): flash('Nieprawidlowy NIP - blad sumy kontrolnej. Sprawdz poprawnosc numeru.', 'error') return render_template('auth/register.html') db = SessionLocal() try: # Check if user exists if db.query(User).filter_by(email=email).first(): flash('Email juz jest zarejestrowany.', 'error') return render_template('auth/register.html') # Check if company is NORDA member is_norda_member = False company_id = None if company_nip and re.match(r'^\d{10}$', company_nip): company = db.query(Company).filter_by(nip=company_nip, status='active').first() if company: is_norda_member = True company_id = company.id # Generate verification token verification_token = secrets.token_urlsafe(32) verification_expires = datetime.now() + timedelta(hours=24) # Create user user = User( email=email, password_hash=generate_password_hash(password, method='pbkdf2:sha256'), name=name, company_nip=company_nip, company_id=company_id, company_role='NONE', is_norda_member=is_norda_member, created_at=datetime.now(), is_active=True, is_verified=False, verification_token=verification_token, verification_token_expires=verification_expires ) db.add(user) db.flush() # Get user.id before creating UserCompany # Create user_companies record if company matched if company_id: uc = UserCompany(user_id=user.id, company_id=company_id, role='NONE', is_primary=True) db.add(uc) db.commit() # Build verification URL base_url = os.getenv('APP_URL', 'https://nordabiznes.pl') verification_url = f"{base_url}/verify-email/{verification_token}" # Try to send verification email try: import email_service if email_service.is_configured(): success = email_service.send_welcome_email(email, name, verification_url) if success: logger.info(f"Verification email sent to {email}") else: logger.warning(f"Failed to send verification email to {email}") logger.info(f"Verification token (email failed) for {email}: {verification_token[:8]}...") else: logger.warning("Email service not configured") logger.info(f"Verification token (no email) for {email}: {verification_token[:8]}...") except Exception as e: logger.error(f"Error sending verification email: {e}") logger.info(f"Verification token (exception) for {email}: {verification_token[:8]}...") logger.info(f"New user registered: {email}") # Send notification to admin about new registration try: company_name = company.name if company_id and company else None _send_registration_notification({ 'name': name, 'email': email, 'company_nip': company_nip, 'company_name': company_name, 'is_norda_member': is_norda_member }) except Exception as e: logger.error(f"Failed to send registration notification: {e}") # Store email in session for success page session['registered_email'] = email return redirect(url_for('auth.registration_success')) except Exception as e: logger.error(f"Registration error: {e}") flash('Wystapil blad podczas rejestracji. Sprobuj ponownie.', 'error') return render_template('auth/register.html') finally: db.close() return render_template('auth/register.html') # ============================================================ # REGISTRATION SUCCESS # ============================================================ @bp.route('/registration-success') def registration_success(): """Show registration success page with email verification instructions.""" # Get email from session (set during registration) email = session.pop('registered_email', None) if not email: # No email in session - redirect to register return redirect(url_for('auth.register')) return render_template('auth/registration_success.html', email=email) @bp.route('/check-verification-status') def check_verification_status(): """API endpoint to check if email has been verified (for polling from success page).""" from flask import jsonify email = request.args.get('email', '') if not email: return jsonify({'verified': False, 'error': 'No email provided'}), 400 db = SessionLocal() try: user = db.query(User).filter(User.email == email).first() if not user: return jsonify({'verified': False, 'error': 'User not found'}), 404 if user.is_verified: return jsonify({'verified': True, 'redirect': url_for('dashboard')}) return jsonify({'verified': False}) finally: db.close() # ============================================================ # LOGIN # ============================================================ @bp.route('/login', methods=['GET', 'POST']) @limiter.limit("1000 per hour" if os.getenv('FLASK_ENV') == 'development' else "60 per minute;500 per hour") def login(): """User login""" if current_user.is_authenticated: return redirect(url_for('index')) if request.method == 'POST': email = sanitize_input(request.form.get('email', ''), 255) password = request.form.get('password', '') remember = request.form.get('remember', False) == 'on' # Basic validation if not email or not password: flash('Email i haslo sa wymagane.', 'error') return render_template('auth/login.html') db = SessionLocal() try: user = db.query(User).filter_by(email=email).first() # Get client IP for logging client_ip = request.headers.get('X-Forwarded-For', request.remote_addr) if client_ip and ',' in client_ip: client_ip = client_ip.split(',')[0].strip() # Check if account is locked if user and user.locked_until and user.locked_until > datetime.now(): remaining = (user.locked_until - datetime.now()).seconds // 60 + 1 security_logger.warning(f"LOCKED_ACCOUNT ip={client_ip} email={email}") flash(f'Konto tymczasowo zablokowane. Sprobuj za {remaining} minut.', 'error') return render_template('auth/login.html') if not user or not check_password_hash(user.password_hash, password): logger.warning(f"Failed login attempt for: {email}") security_logger.warning(f"FAILED_LOGIN ip={client_ip} email={email}") # Log failed login to audit log_audit(db, 'login_failed', 'user', entity_name=email, details={'reason': 'invalid_credentials', 'ip': client_ip}) db.commit() # Increment failed attempts if user exists if user: user.failed_login_attempts = (user.failed_login_attempts or 0) + 1 # Lock account after 5 failed attempts (30 min lockout) if user.failed_login_attempts >= 5: user.locked_until = datetime.now() + timedelta(minutes=30) security_logger.warning(f"ACCOUNT_LOCKED ip={client_ip} email={email} attempts={user.failed_login_attempts}") db.commit() flash('Zbyt wiele nieudanych prob. Konto zablokowane na 30 minut.', 'error') return render_template('auth/login.html') db.commit() flash('Nieprawidlowy email lub haslo.', 'error') return render_template('auth/login.html') if not user.is_active: flash('Konto zostalo dezaktywowane.', 'error') return render_template('auth/login.html') # Require email verification if not user.is_verified: flash('Musisz potwierdzic adres email przed zalogowaniem. Sprawdz skrzynke.', 'error') return render_template('auth/login.html') # Reset failed attempts on successful login user.failed_login_attempts = 0 user.locked_until = None # Check if user has 2FA enabled if user.totp_enabled and SECURITY_SERVICE_AVAILABLE: # Store pending login in session for 2FA verification session['2fa_pending_user_id'] = user.id session['2fa_remember'] = remember next_page = request.args.get('next') if next_page and next_page.startswith('/'): session['2fa_next'] = next_page db.commit() logger.info(f"2FA required for user: {email}") return redirect(url_for('auth.verify_2fa')) # No 2FA - login directly login_user(user, remember=remember) session['active_company_id'] = user.company_id user.last_login = datetime.now() user.login_count = (user.login_count or 0) + 1 _auto_link_person(db, user) # Log successful login to audit log_audit(db, 'login', 'user', entity_id=user.id, entity_name=user.email, details={'method': 'password', 'ip': client_ip}) db.commit() logger.info(f"User logged in: {email}") next_page = request.args.get('next') # Prevent open redirect vulnerability if next_page and not next_page.startswith('/'): next_page = None return redirect(next_page or url_for('dashboard')) except Exception as e: logger.error(f"Login error: {e}") flash('Wystapil blad podczas logowania. Sprobuj ponownie.', 'error') return render_template('auth/login.html') finally: db.close() return render_template('auth/login.html') # ============================================================ # LOGOUT # ============================================================ @bp.route('/logout') @login_required def logout(): """User logout""" # Log logout to audit db = SessionLocal() try: log_audit(db, 'logout', 'user', entity_id=current_user.id, entity_name=current_user.email) db.commit() finally: db.close() # Clear 2FA session flag session.pop('2fa_verified', None) logout_user() flash('Wylogowano pomyslnie.', 'success') return redirect(url_for('index')) # ============================================================ # TWO-FACTOR AUTHENTICATION # ============================================================ @bp.route('/verify-2fa', methods=['GET', 'POST']) @limiter.limit("10 per minute") def verify_2fa(): """Verify 2FA code during login""" # Check if there's a pending 2FA login pending_user_id = session.get('2fa_pending_user_id') if not pending_user_id: flash('Sesja wygasla. Zaloguj sie ponownie.', 'error') return redirect(url_for('login')) if request.method == 'POST': code = request.form.get('code', '').strip() use_backup = request.form.get('use_backup', False) if not code: flash('Wprowadz kod weryfikacyjny.', 'error') return render_template('auth/verify_2fa.html') db = SessionLocal() try: user = db.query(User).get(pending_user_id) if not user: session.pop('2fa_pending_user_id', None) flash('Uzytkownik nie istnieje.', 'error') return redirect(url_for('login')) # Verify code if SECURITY_SERVICE_AVAILABLE: if use_backup: valid = verify_backup_code(user, code, db) else: valid = verify_totp(user, code) else: valid = False if valid: # Clear pending login and log in session.pop('2fa_pending_user_id', None) remember = session.pop('2fa_remember', False) next_page = session.pop('2fa_next', None) login_user(user, remember=remember) session['active_company_id'] = user.company_id session['2fa_verified'] = True user.last_login = datetime.now() user.login_count = (user.login_count or 0) + 1 _auto_link_person(db, user) # Log successful 2FA login to audit client_ip = request.headers.get('X-Forwarded-For', request.remote_addr) if client_ip and ',' in client_ip: client_ip = client_ip.split(',')[0].strip() log_audit(db, 'login', 'user', entity_id=user.id, entity_name=user.email, details={'method': '2fa', 'ip': client_ip}) db.commit() logger.info(f"User logged in with 2FA: {user.email}") flash('Zalogowano pomyslnie.', 'success') return redirect(next_page or url_for('dashboard')) else: client_ip = request.headers.get('X-Forwarded-For', request.remote_addr) if client_ip and ',' in client_ip: client_ip = client_ip.split(',')[0].strip() security_logger.warning(f"INVALID_2FA ip={client_ip} user_id={pending_user_id}") flash('Nieprawidlowy kod weryfikacyjny.', 'error') except Exception as e: logger.error(f"2FA verification error: {e}") flash('Wystapil blad. Sprobuj ponownie.', 'error') finally: db.close() return render_template('auth/verify_2fa.html') @bp.route('/settings/2fa', methods=['GET', 'POST']) @login_required def settings_2fa(): """2FA settings - enable/disable""" db = SessionLocal() try: user = db.query(User).get(current_user.id) if request.method == 'POST': action = request.form.get('action') if action == 'setup': # Generate new secret if SECURITY_SERVICE_AVAILABLE: secret = generate_totp_secret() if secret: user.totp_secret = secret user.totp_enabled = False # Not enabled until verified db.commit() qr_uri = get_totp_uri(user) return render_template('auth/2fa_setup.html', qr_uri=qr_uri, secret=secret) flash('Blad konfiguracji 2FA.', 'error') elif action == 'verify_setup': # Verify the setup code and enable 2FA code = request.form.get('code', '').strip() if SECURITY_SERVICE_AVAILABLE and verify_totp(user, code): user.totp_enabled = True # Generate backup codes backup_codes = generate_backup_codes(8) user.totp_backup_codes = backup_codes db.commit() # Log audit if SECURITY_SERVICE_AVAILABLE: log_audit(db, '2fa.enabled', 'user', user.id, user.email) db.commit() logger.info(f"2FA enabled for user: {user.email}") return render_template('auth/2fa_backup_codes.html', backup_codes=backup_codes) else: flash('Nieprawidlowy kod. Sprobuj ponownie.', 'error') qr_uri = get_totp_uri(user) return render_template('auth/2fa_setup.html', qr_uri=qr_uri, secret=user.totp_secret) elif action == 'disable': # Require current code to disable code = request.form.get('code', '').strip() if SECURITY_SERVICE_AVAILABLE and verify_totp(user, code): user.totp_enabled = False user.totp_secret = None user.totp_backup_codes = None db.commit() # Log audit if SECURITY_SERVICE_AVAILABLE: log_audit(db, '2fa.disabled', 'user', user.id, user.email) db.commit() logger.info(f"2FA disabled for user: {user.email}") flash('Uwierzytelnianie dwuskladnikowe zostalo wylaczone.', 'success') else: flash('Nieprawidlowy kod. Nie mozna wylaczyc 2FA.', 'error') elif action == 'regenerate_backup': # Require current code to regenerate backup codes code = request.form.get('code', '').strip() if SECURITY_SERVICE_AVAILABLE and verify_totp(user, code): backup_codes = generate_backup_codes(8) user.totp_backup_codes = backup_codes db.commit() logger.info(f"Backup codes regenerated for user: {user.email}") return render_template('auth/2fa_backup_codes.html', backup_codes=backup_codes) else: flash('Nieprawidlowy kod. Nie mozna wygenerowac kodow.', 'error') return render_template('auth/2fa_settings.html', totp_enabled=user.totp_enabled, backup_codes_count=len(user.totp_backup_codes) if user.totp_backup_codes else 0) except Exception as e: logger.error(f"2FA settings error: {e}") flash('Wystapil blad.', 'error') return redirect(url_for('dashboard')) finally: db.close() # ============================================================ # PRIVACY SETTINGS # ============================================================ @bp.route('/settings/privacy', methods=['GET', 'POST']) @login_required def settings_privacy(): """Privacy settings - control visibility of phone and email""" db = SessionLocal() try: user = db.query(User).get(current_user.id) if request.method == 'POST': # Update privacy settings user.privacy_show_phone = request.form.get('show_phone') == 'on' user.privacy_show_email = request.form.get('show_email') == 'on' # Update contact preferences user.contact_prefer_email = request.form.get('prefer_email') == 'on' user.contact_prefer_phone = request.form.get('prefer_phone') == 'on' user.contact_prefer_portal = request.form.get('prefer_portal') == 'on' user.contact_note = request.form.get('contact_note', '').strip() or None db.commit() logger.info(f"Privacy settings updated for user: {user.email}") flash('Ustawienia prywatnosci zostaly zapisane.', 'success') return redirect(url_for('auth.settings_privacy')) return render_template('settings/privacy.html', user=user, show_phone=user.privacy_show_phone if user.privacy_show_phone is not None else True, show_email=user.privacy_show_email if user.privacy_show_email is not None else True) except Exception as e: logger.error(f"Privacy settings error: {e}") flash('Wystapil blad.', 'error') return redirect(url_for('dashboard')) finally: db.close() # ============================================================ # BLOCK SETTINGS # ============================================================ @bp.route('/settings/blocks', methods=['GET']) @login_required def settings_blocks(): """Manage blocked users""" db = SessionLocal() try: # Get list of blocked users blocks = db.query(UserBlock).filter( UserBlock.user_id == current_user.id ).all() # Get all users for blocking (exclude self and already blocked) blocked_ids = [b.blocked_user_id for b in blocks] blocked_ids.append(current_user.id) available_users = db.query(User).filter( User.id.notin_(blocked_ids), User.is_active == True ).order_by(User.name).all() return render_template('settings/blocks.html', blocks=blocks, available_users=available_users) finally: db.close() @bp.route('/settings/blocks/add', methods=['POST']) @login_required def settings_blocks_add(): """Block a user""" user_id = request.form.get('user_id', type=int) reason = request.form.get('reason', '').strip() if not user_id or user_id == current_user.id: flash('Nieprawidlowy uzytkownik.', 'error') return redirect(url_for('auth.settings_blocks')) db = SessionLocal() try: # Check if already blocked existing = db.query(UserBlock).filter( UserBlock.user_id == current_user.id, UserBlock.blocked_user_id == user_id ).first() if existing: flash('Ten uzytkownik jest juz zablokowany.', 'warning') return redirect(url_for('auth.settings_blocks')) block = UserBlock( user_id=current_user.id, blocked_user_id=user_id, reason=reason if reason else None ) db.add(block) db.commit() logger.info(f"User {current_user.id} blocked user {user_id}") flash('Uzytkownik zostal zablokowany.', 'success') except Exception as e: logger.error(f"Error blocking user: {e}") flash('Wystapil blad.', 'error') finally: db.close() return redirect(url_for('auth.settings_blocks')) @bp.route('/settings/blocks/remove/', methods=['POST']) @login_required def settings_blocks_remove(block_id): """Unblock a user""" db = SessionLocal() try: block = db.query(UserBlock).filter( UserBlock.id == block_id, UserBlock.user_id == current_user.id ).first() if not block: flash('Blokada nie istnieje.', 'error') return redirect(url_for('auth.settings_blocks')) blocked_user_id = block.blocked_user_id db.delete(block) db.commit() logger.info(f"User {current_user.id} unblocked user {blocked_user_id}") flash('Uzytkownik zostal odblokowany.', 'success') except Exception as e: logger.error(f"Error unblocking user: {e}") flash('Wystapil blad.', 'error') finally: db.close() return redirect(url_for('auth.settings_blocks')) # ============================================================ # MOJE KONTO - User Account Settings # ============================================================ @bp.route('/konto') @login_required def konto_dane(): """User profile - edit personal data""" return render_template('konto/dane.html') @bp.route('/konto', methods=['POST']) @login_required def konto_dane_post(): """Save user profile changes""" db = SessionLocal() try: user = db.query(User).filter_by(id=current_user.id).first() if user: name = sanitize_input(request.form.get('name', ''), 255) phone = sanitize_input(request.form.get('phone', ''), 50) user.name = name if name else None user.phone = phone if phone else None db.commit() # Update current_user session current_user.name = user.name current_user.phone = user.phone logger.info(f"Profile updated for user: {user.email}") flash('Dane zostaly zapisane.', 'success') except Exception as e: logger.error(f"Profile update error: {e}") flash('Wystapil blad podczas zapisywania.', 'error') finally: db.close() return redirect(url_for('auth.konto_dane')) @bp.route('/konto/avatar', methods=['POST']) @login_required def konto_avatar_upload(): """Upload or replace user avatar photo""" from file_upload_service import FileUploadService, _BASE_DIR from PIL import Image file = request.files.get('avatar') if not file or file.filename == '': flash('Nie wybrano pliku.', 'error') return redirect(url_for('auth.konto_dane')) # Validate using existing service is_valid, error = FileUploadService.validate_file(file) if not is_valid: flash(error, 'error') return redirect(url_for('auth.konto_dane')) db = SessionLocal() try: user = db.query(User).filter_by(id=current_user.id).first() if not user: flash('Nie znaleziono użytkownika.', 'error') return redirect(url_for('auth.konto_dane')) # Generate filename and path ext = file.filename.rsplit('.', 1)[-1].lower() if ext == 'jpeg': ext = 'jpg' import uuid stored_filename = f"{uuid.uuid4()}.{ext}" now = datetime.now() avatar_dir = os.path.join(_BASE_DIR, 'static', 'uploads', 'avatars', str(now.year), f"{now.month:02d}") os.makedirs(avatar_dir, exist_ok=True) file_path = os.path.join(avatar_dir, stored_filename) # Resize to 300x300 square crop, strip EXIF img = Image.open(file) if img.mode in ('RGBA', 'LA', 'P') and ext == 'jpg': img = img.convert('RGB') elif img.mode not in ('RGB', 'RGBA'): img = img.convert('RGB') # Center crop to square w, h = img.size side = min(w, h) left = (w - side) // 2 top = (h - side) // 2 img = img.crop((left, top, left + side, top + side)) img = img.resize((300, 300), Image.LANCZOS) # Save save_kwargs = {'optimize': True} if ext == 'jpg': save_kwargs['quality'] = 85 img.save(file_path, **save_kwargs) # Delete old avatar if exists if user.avatar_path: old_path = os.path.join(_BASE_DIR, 'static', user.avatar_path) if os.path.exists(old_path): try: os.remove(old_path) except OSError: pass # Save relative path user.avatar_path = os.path.relpath(file_path, os.path.join(_BASE_DIR, 'static')) db.commit() current_user.avatar_path = user.avatar_path logger.info(f"Avatar uploaded for user {user.id}: {user.avatar_path}") flash('Zdjęcie profilowe zostało zapisane.', 'success') except Exception as e: db.rollback() logger.error(f"Avatar upload error: {e}") flash('Wystąpił błąd podczas zapisywania zdjęcia.', 'error') finally: db.close() return redirect(url_for('auth.konto_dane')) @bp.route('/konto/avatar/delete', methods=['POST']) @login_required def konto_avatar_delete(): """Delete user avatar photo""" from file_upload_service import _BASE_DIR db = SessionLocal() try: user = db.query(User).filter_by(id=current_user.id).first() if user and user.avatar_path: # Delete file old_path = os.path.join(_BASE_DIR, 'static', user.avatar_path) if os.path.exists(old_path): try: os.remove(old_path) except OSError: pass user.avatar_path = None db.commit() current_user.avatar_path = None logger.info(f"Avatar deleted for user {user.id}") flash('Zdjęcie profilowe zostało usunięte.', 'success') except Exception as e: db.rollback() logger.error(f"Avatar delete error: {e}") flash('Wystąpił błąd podczas usuwania zdjęcia.', 'error') finally: db.close() return redirect(url_for('auth.konto_dane')) @bp.route('/konto/prywatnosc', methods=['GET', 'POST']) @login_required def konto_prywatnosc(): """Privacy settings - control visibility of phone and email""" db = SessionLocal() try: user = db.query(User).filter_by(id=current_user.id).first() if request.method == 'POST': user.privacy_show_phone = request.form.get('show_phone') == 'on' user.privacy_show_email = request.form.get('show_email') == 'on' user.contact_prefer_email = request.form.get('prefer_email') == 'on' user.contact_prefer_phone = request.form.get('prefer_phone') == 'on' user.contact_prefer_portal = request.form.get('prefer_portal') == 'on' user.notify_email_messages = request.form.get('notify_email_messages') == 'on' db.commit() logger.info(f"Privacy settings updated for user: {user.email}") flash('Ustawienia prywatności zostały zapisane.', 'success') return redirect(url_for('auth.konto_prywatnosc')) return render_template('konto/prywatnosc.html', user=user, show_phone=user.privacy_show_phone if user.privacy_show_phone is not None else True, show_email=user.privacy_show_email if user.privacy_show_email is not None else True) except Exception as e: logger.error(f"Privacy settings error: {e}") flash('Wystapil blad.', 'error') return redirect(url_for('auth.konto_dane')) finally: db.close() @bp.route('/konto/bezpieczenstwo') @login_required def konto_bezpieczenstwo(): """Security settings - 2FA, password""" return render_template('konto/bezpieczenstwo.html') @bp.route('/konto/blokady') @login_required def konto_blokady(): """User blocks management""" db = SessionLocal() try: blocks = db.query(UserBlock).filter_by(user_id=current_user.id).all() blocked_ids = [b.blocked_user_id for b in blocks] blocked_ids.append(current_user.id) available_users = db.query(User).filter( User.id.notin_(blocked_ids), User.is_active == True ).order_by(User.name).all() return render_template('konto/blokady.html', blocks=blocks, available_users=available_users) except Exception as e: logger.error(f"Blocks page error: {e}") flash('Wystapil blad.', 'error') return redirect(url_for('auth.konto_dane')) finally: db.close() @bp.route('/konto/blokady/dodaj', methods=['POST']) @login_required def konto_blokady_dodaj(): """Block a user""" db = SessionLocal() try: user_id = request.form.get('user_id', type=int) if not user_id or user_id == current_user.id: flash('Nieprawidlowy uzytkownik.', 'error') return redirect(url_for('auth.konto_blokady')) existing = db.query(UserBlock).filter_by( user_id=current_user.id, blocked_user_id=user_id ).first() if existing: flash('Ten uzytkownik jest juz zablokowany.', 'info') return redirect(url_for('auth.konto_blokady')) block = UserBlock(user_id=current_user.id, blocked_user_id=user_id) db.add(block) db.commit() logger.info(f"User {current_user.id} blocked user {user_id}") flash('Uzytkownik zostal zablokowany.', 'success') except Exception as e: logger.error(f"Error blocking user: {e}") flash('Wystapil blad.', 'error') finally: db.close() return redirect(url_for('auth.konto_blokady')) @bp.route('/konto/integracje') @login_required def konto_integracje(): """OAuth integrations page for MANAGER+. Allows connecting Google/Meta accounts for enriched audit data. """ if not current_user.has_role(SystemRole.MANAGER): flash('Ta strona wymaga uprawnień kadry zarządzającej.', 'error') return redirect(url_for('auth.konto_dane')) if not current_user.company_id: flash('Musisz byc przypisany do firmy, aby korzystac z integracji.', 'info') return redirect(url_for('auth.konto_dane')) db = SessionLocal() try: company = db.query(Company).filter(Company.id == current_user.company_id).first() if not company: flash('Firma nie istnieje.', 'error') return redirect(url_for('auth.konto_dane')) from oauth_service import OAuthService oauth = OAuthService() connections = oauth.get_connected_services(db, current_user.company_id) oauth_available = { 'google': bool(oauth.google_client_id), 'meta': bool(oauth.meta_app_id), } return render_template( 'admin/company_settings.html', company=company, connections=connections, oauth_available=oauth_available, is_user_view=True, ) finally: db.close() @bp.route('/konto/blokady/usun/', methods=['POST']) @login_required def konto_blokady_usun(block_id): """Unblock a user""" db = SessionLocal() try: block = db.query(UserBlock).filter( UserBlock.id == block_id, UserBlock.user_id == current_user.id ).first() if not block: flash('Blokada nie istnieje.', 'error') return redirect(url_for('auth.konto_blokady')) blocked_user_id = block.blocked_user_id db.delete(block) db.commit() logger.info(f"User {current_user.id} unblocked user {blocked_user_id}") flash('Uzytkownik zostal odblokowany.', 'success') except Exception as e: logger.error(f"Error unblocking user: {e}") flash('Wystapil blad.', 'error') finally: db.close() return redirect(url_for('auth.konto_blokady')) # ============================================================ # PASSWORD RESET # ============================================================ @bp.route('/forgot-password', methods=['GET', 'POST']) @limiter.limit("20 per hour") def forgot_password(): """Request password reset""" if current_user.is_authenticated: return redirect(url_for('index')) if request.method == 'POST': email = sanitize_input(request.form.get('email', ''), 255) if not validate_email(email): flash('Nieprawidlowy format adresu email.', 'error') return render_template('auth/forgot_password.html') db = SessionLocal() try: user = db.query(User).filter_by(email=email, is_active=True).first() if user: # Generate reset token reset_token = secrets.token_urlsafe(32) reset_expires = datetime.now() + timedelta(hours=1) # Save token to database user.reset_token = reset_token user.reset_token_expires = reset_expires db.commit() # Build reset URL base_url = os.getenv('APP_URL', 'https://nordabiznes.pl') reset_url = f"{base_url}/reset-password/{reset_token}" # Try to send email try: import email_service if email_service.is_configured(): success = email_service.send_password_reset_email(email, reset_url) if success: logger.info(f"Password reset email sent to {email}") else: logger.warning(f"Failed to send password reset email to {email}") logger.info(f"Reset token (email failed) for {email}: {reset_token[:8]}...") else: logger.warning("Email service not configured") logger.info(f"Reset token (no email) for {email}: {reset_token[:8]}...") except Exception as e: logger.error(f"Error sending reset email: {e}") logger.info(f"Reset token (exception) for {email}: {reset_token[:8]}...") # Always show same message to prevent email enumeration flash('Jesli email istnieje w systemie, instrukcje resetowania hasla zostaly wyslane.', 'info') return redirect(url_for('login')) except Exception as e: logger.error(f"Password reset error: {e}") flash('Wystapil blad. Sprobuj ponownie.', 'error') finally: db.close() return render_template('auth/forgot_password.html') @bp.route('/reset-password/', methods=['GET', 'POST']) @limiter.limit("30 per hour") def reset_password(token): """Reset password with token""" if current_user.is_authenticated: return redirect(url_for('index')) db = SessionLocal() try: # Find user with valid token user = db.query(User).filter( User.reset_token == token, User.reset_token_expires > datetime.now(), User.is_active == True ).first() if not user: flash('Link resetowania hasla jest nieprawidlowy lub wygasl.', 'error') return redirect(url_for('auth.forgot_password')) if request.method == 'POST': password = request.form.get('password', '') password_confirm = request.form.get('password_confirm', '') # Validate passwords match if password != password_confirm: flash('Hasla nie sa identyczne.', 'error') return render_template('auth/reset_password.html', token=token) # Validate password strength password_valid, password_message = validate_password(password) if not password_valid: flash(password_message, 'error') return render_template('auth/reset_password.html', token=token) # Update password and clear reset token user.password_hash = generate_password_hash(password, method='pbkdf2:sha256') user.reset_token = None user.reset_token_expires = None # Auto-verify email - user proved inbox access by using reset link if not user.is_verified: user.is_verified = True user.verified_at = datetime.now() user.verification_token = None user.verification_token_expires = None logger.info(f"Email auto-verified via password reset for {user.email}") # Notify admin if this is a first-time activation (user never logged in) first_activation = user.last_login is None db.commit() logger.info(f"Password reset successful for {user.email}") if first_activation: try: import email_service email_service.send_email( to=['maciej.pienczyn@inpi.pl'], subject=f'Aktywacja konta: {user.name}', body_text=f'{user.name} ({user.email}) właśnie ustawił(a) hasło i aktywował(a) swoje konto w portalu Norda Biznes Partner.', email_type='admin_notification', bcc=[] ) except Exception as notify_err: logger.warning(f"Failed to send activation notification: {notify_err}") flash('Haslo zostalo zmienione. Mozesz sie teraz zalogowac.', 'success') return redirect(url_for('login')) return render_template('auth/reset_password.html', token=token) except Exception as e: logger.error(f"Reset password error: {e}") flash('Wystapil blad. Sprobuj ponownie.', 'error') return redirect(url_for('auth.forgot_password')) finally: db.close() # ============================================================ # EMAIL VERIFICATION # ============================================================ @bp.route('/verify-email/') def verify_email(token): """Verify email address with token""" db = SessionLocal() try: user = db.query(User).filter( User.verification_token == token, User.verification_token_expires > datetime.now(), User.is_active == True ).first() if not user: flash('Link weryfikacyjny jest nieprawidlowy lub wygasl.', 'error') return redirect(url_for('login')) if user.is_verified: # Already verified - auto-login if not logged in if not current_user.is_authenticated: login_user(user, remember=True) user.last_login = datetime.now() user.login_count = (user.login_count or 0) + 1 db.commit() flash('Witamy ponownie! Zostales automatycznie zalogowany.', 'info') return redirect(url_for('dashboard')) flash('Email zostal juz zweryfikowany.', 'info') return redirect(url_for('dashboard')) # Verify user (keep token until natural expiry so double-clicks # and email client prefetch don't show a confusing error) user.is_verified = True user.verified_at = datetime.now() # Auto-login the user after verification login_user(user, remember=True) user.last_login = datetime.now() user.login_count = (user.login_count or 0) + 1 # Log successful verification and login client_ip = request.headers.get('X-Forwarded-For', request.remote_addr) if client_ip and ',' in client_ip: client_ip = client_ip.split(',')[0].strip() log_audit(db, 'email_verified', 'user', entity_id=user.id, entity_name=user.email, details={'ip': client_ip}) log_audit(db, 'login', 'user', entity_id=user.id, entity_name=user.email, details={'method': 'email_verification', 'ip': client_ip}) db.commit() logger.info(f"Email verified and auto-logged in: {user.email}") flash('Email zweryfikowany! Witamy w Norda Biznes Partner.', 'success') return redirect(url_for('dashboard')) except Exception as e: logger.error(f"Email verification error: {e}") flash('Wystapil blad podczas weryfikacji.', 'error') return redirect(url_for('login')) finally: db.close() @bp.route('/resend-verification', methods=['GET', 'POST']) @limiter.limit("15 per hour") def resend_verification(): """Resend email verification link""" if current_user.is_authenticated: return redirect(url_for('index')) if request.method == 'POST': email = sanitize_input(request.form.get('email', ''), 255) if not validate_email(email): flash('Nieprawidlowy format adresu email.', 'error') return render_template('auth/resend_verification.html') db = SessionLocal() try: user = db.query(User).filter_by(email=email, is_active=True).first() if user and not user.is_verified: # Generate new verification token verification_token = secrets.token_urlsafe(32) verification_expires = datetime.now() + timedelta(hours=24) # Update user token user.verification_token = verification_token user.verification_token_expires = verification_expires db.commit() # Build verification URL base_url = os.getenv('APP_URL', 'https://nordabiznes.pl') verification_url = f"{base_url}/verify-email/{verification_token}" # Try to send email try: import email_service if email_service.is_configured(): success = email_service.send_welcome_email(email, user.name, verification_url) if success: logger.info(f"Verification email resent to {email}") else: logger.warning(f"Failed to resend verification email to {email}") logger.info(f"Resend verification token (email failed) for {email}: {verification_token[:8]}...") else: logger.warning("Email service not configured") logger.info(f"Resend verification token (no email) for {email}: {verification_token[:8]}...") except Exception as e: logger.error(f"Error resending verification email: {e}") logger.info(f"Resend verification token (exception) for {email}: {verification_token[:8]}...") # Always show same message to prevent email enumeration flash('Jesli konto istnieje i nie zostalo zweryfikowane, email weryfikacyjny zostal wyslany.', 'info') return redirect(url_for('login')) except Exception as e: logger.error(f"Resend verification error: {e}") flash('Wystapil blad. Sprobuj ponownie.', 'error') finally: db.close() return render_template('auth/resend_verification.html')