nordabiz/blueprints/auth/routes.py
Maciej Pienczyn c46d4794e2
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
fix: keep verification token until natural expiry
Prevents confusing "link invalid" error when verification link is
clicked twice (or prefetched by email clients like Outlook). The token
now expires naturally instead of being cleared on first use.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-13 18:30:07 +01:00

1352 lines
50 KiB
Python

"""
Auth Routes
============
Authentication routes: login, logout, register, password reset, email verification, 2FA.
"""
import os
import re
import logging
import secrets
from datetime import datetime, timedelta
from flask import render_template, request, redirect, url_for, flash, session, current_app
from flask_login import login_required, login_user, logout_user, current_user
from werkzeug.security import generate_password_hash, check_password_hash
from . import bp
from database import SessionLocal, User, Company, UserBlock, UserCompany, SystemRole
from utils.helpers import sanitize_input, validate_email, validate_password
from extensions import limiter
from security_service import log_audit
# Logger
logger = logging.getLogger(__name__)
# Security logger (same name as in app.py for fail2ban integration)
security_logger = logging.getLogger('security')
# Security service availability check
try:
from security_service import (
generate_totp_secret, get_totp_uri, verify_totp,
generate_backup_codes, verify_backup_code, log_audit
)
SECURITY_SERVICE_AVAILABLE = True
except ImportError:
SECURITY_SERVICE_AVAILABLE = False
logger.warning("Security service not available in auth blueprint")
def _auto_link_person(db, user):
"""Auto-link User to Person record by name match (if not already linked)."""
if user.person_id or not user.name:
return
try:
from database import Person
name_parts = user.name.strip().split()
if len(name_parts) < 2:
return
# Try exact match: first name + last name (case-insensitive)
first_name = name_parts[0].upper()
last_name = name_parts[-1].upper()
from sqlalchemy import func
person = db.query(Person).filter(
func.upper(Person.nazwisko) == last_name,
func.upper(Person.imiona).like(f'{first_name}%'),
).first()
if person:
user.person_id = person.id
logger.info(f"Auto-linked user {user.id} ({user.name}) to person {person.id}")
except Exception as e:
logger.error(f"Auto-link person failed for user {user.id}: {e}")
def _send_registration_notification(user_info):
"""Send email notification when a new user registers"""
try:
from email_service import send_email, is_configured
if not is_configured():
logger.warning("Email service not configured - skipping registration notification")
return
notify_email = os.getenv('ERROR_NOTIFY_EMAIL', 'maciej.pienczyn@inpi.pl')
if not notify_email:
return
reg_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
is_member = "TAK" if user_info.get('is_norda_member') else "NIE"
company_name = user_info.get('company_name', 'Brak przypisanej firmy')
subject = f"NordaBiz: Nowa rejestracja - {user_info.get('name', 'Nieznany')}"
body_text = f"""NOWA REJESTRACJA NA NORDABIZNES.PL
{'='*50}
Czas: {reg_time}
Imie: {user_info.get('name', 'N/A')}
Email: {user_info.get('email', 'N/A')}
NIP: {user_info.get('company_nip', 'N/A')}
Firma: {company_name}
Czlonek NORDA: {is_member}
{'='*50}
"""
send_email(
to=[notify_email],
subject=subject,
body_text=body_text,
email_type='registration_notification'
)
logger.info(f"Registration notification sent for {user_info.get('email')}")
except Exception as e:
logger.error(f"Failed to send registration notification: {e}")
# ============================================================
# REGISTRATION
# ============================================================
@bp.route('/register', methods=['GET', 'POST'])
@limiter.limit("50 per hour;200 per day")
def register():
"""User registration"""
if current_user.is_authenticated:
return redirect(url_for('index'))
if request.method == 'POST':
email = sanitize_input(request.form.get('email', ''), 255)
password = request.form.get('password', '')
name = sanitize_input(request.form.get('name', ''), 255)
company_nip = sanitize_input(request.form.get('company_nip', ''), 10)
# Validate email
if not validate_email(email):
flash('Nieprawidlowy format adresu email.', 'error')
return render_template('auth/register.html')
# Validate password
password_valid, password_message = validate_password(password)
if not password_valid:
flash(password_message, 'error')
return render_template('auth/register.html')
# Validate required fields
if not name or not email or not company_nip:
flash('Imię, email i NIP firmy są wymagane.', 'error')
return render_template('auth/register.html')
# Validate full name (first + last name)
if len(name.split()) < 2:
flash('Podaj imię i nazwisko (np. Jan Kowalski).', 'error')
return render_template('auth/register.html')
# Validate NIP format and checksum
if not re.match(r'^\d{10}$', company_nip):
flash('NIP musi skladac sie z 10 cyfr.', 'error')
return render_template('auth/register.html')
nip_weights = [6, 5, 7, 2, 3, 4, 5, 6, 7]
nip_checksum = sum(int(company_nip[i]) * nip_weights[i] for i in range(9)) % 11
if nip_checksum != int(company_nip[9]):
flash('Nieprawidlowy NIP - blad sumy kontrolnej. Sprawdz poprawnosc numeru.', 'error')
return render_template('auth/register.html')
db = SessionLocal()
try:
# Check if user exists
if db.query(User).filter_by(email=email).first():
flash('Email juz jest zarejestrowany.', 'error')
return render_template('auth/register.html')
# Check if company is NORDA member
is_norda_member = False
company_id = None
if company_nip and re.match(r'^\d{10}$', company_nip):
company = db.query(Company).filter_by(nip=company_nip, status='active').first()
if company:
is_norda_member = True
company_id = company.id
# Generate verification token
verification_token = secrets.token_urlsafe(32)
verification_expires = datetime.now() + timedelta(hours=24)
# Create user
user = User(
email=email,
password_hash=generate_password_hash(password, method='pbkdf2:sha256'),
name=name,
company_nip=company_nip,
company_id=company_id,
company_role='NONE',
is_norda_member=is_norda_member,
created_at=datetime.now(),
is_active=True,
is_verified=False,
verification_token=verification_token,
verification_token_expires=verification_expires
)
db.add(user)
db.flush() # Get user.id before creating UserCompany
# Create user_companies record if company matched
if company_id:
uc = UserCompany(user_id=user.id, company_id=company_id, role='NONE', is_primary=True)
db.add(uc)
db.commit()
# Build verification URL
base_url = os.getenv('APP_URL', 'https://nordabiznes.pl')
verification_url = f"{base_url}/verify-email/{verification_token}"
# Try to send verification email
try:
import email_service
if email_service.is_configured():
success = email_service.send_welcome_email(email, name, verification_url)
if success:
logger.info(f"Verification email sent to {email}")
else:
logger.warning(f"Failed to send verification email to {email}")
logger.info(f"Verification token (email failed) for {email}: {verification_token[:8]}...")
else:
logger.warning("Email service not configured")
logger.info(f"Verification token (no email) for {email}: {verification_token[:8]}...")
except Exception as e:
logger.error(f"Error sending verification email: {e}")
logger.info(f"Verification token (exception) for {email}: {verification_token[:8]}...")
logger.info(f"New user registered: {email}")
# Send notification to admin about new registration
try:
company_name = company.name if company_id and company else None
_send_registration_notification({
'name': name,
'email': email,
'company_nip': company_nip,
'company_name': company_name,
'is_norda_member': is_norda_member
})
except Exception as e:
logger.error(f"Failed to send registration notification: {e}")
# Store email in session for success page
session['registered_email'] = email
return redirect(url_for('auth.registration_success'))
except Exception as e:
logger.error(f"Registration error: {e}")
flash('Wystapil blad podczas rejestracji. Sprobuj ponownie.', 'error')
return render_template('auth/register.html')
finally:
db.close()
return render_template('auth/register.html')
# ============================================================
# REGISTRATION SUCCESS
# ============================================================
@bp.route('/registration-success')
def registration_success():
"""Show registration success page with email verification instructions."""
# Get email from session (set during registration)
email = session.pop('registered_email', None)
if not email:
# No email in session - redirect to register
return redirect(url_for('auth.register'))
return render_template('auth/registration_success.html', email=email)
@bp.route('/check-verification-status')
def check_verification_status():
"""API endpoint to check if email has been verified (for polling from success page)."""
from flask import jsonify
email = request.args.get('email', '')
if not email:
return jsonify({'verified': False, 'error': 'No email provided'}), 400
db = SessionLocal()
try:
user = db.query(User).filter(User.email == email).first()
if not user:
return jsonify({'verified': False, 'error': 'User not found'}), 404
if user.is_verified:
return jsonify({'verified': True, 'redirect': url_for('dashboard')})
return jsonify({'verified': False})
finally:
db.close()
# ============================================================
# LOGIN
# ============================================================
@bp.route('/login', methods=['GET', 'POST'])
@limiter.limit("1000 per hour" if os.getenv('FLASK_ENV') == 'development' else "60 per minute;500 per hour")
def login():
"""User login"""
if current_user.is_authenticated:
return redirect(url_for('index'))
if request.method == 'POST':
email = sanitize_input(request.form.get('email', ''), 255)
password = request.form.get('password', '')
remember = request.form.get('remember', False) == 'on'
# Basic validation
if not email or not password:
flash('Email i haslo sa wymagane.', 'error')
return render_template('auth/login.html')
db = SessionLocal()
try:
user = db.query(User).filter_by(email=email).first()
# Get client IP for logging
client_ip = request.headers.get('X-Forwarded-For', request.remote_addr)
if client_ip and ',' in client_ip:
client_ip = client_ip.split(',')[0].strip()
# Check if account is locked
if user and user.locked_until and user.locked_until > datetime.now():
remaining = (user.locked_until - datetime.now()).seconds // 60 + 1
security_logger.warning(f"LOCKED_ACCOUNT ip={client_ip} email={email}")
flash(f'Konto tymczasowo zablokowane. Sprobuj za {remaining} minut.', 'error')
return render_template('auth/login.html')
if not user or not check_password_hash(user.password_hash, password):
logger.warning(f"Failed login attempt for: {email}")
security_logger.warning(f"FAILED_LOGIN ip={client_ip} email={email}")
# Log failed login to audit
log_audit(db, 'login_failed', 'user', entity_name=email,
details={'reason': 'invalid_credentials', 'ip': client_ip})
db.commit()
# Increment failed attempts if user exists
if user:
user.failed_login_attempts = (user.failed_login_attempts or 0) + 1
# Lock account after 5 failed attempts (30 min lockout)
if user.failed_login_attempts >= 5:
user.locked_until = datetime.now() + timedelta(minutes=30)
security_logger.warning(f"ACCOUNT_LOCKED ip={client_ip} email={email} attempts={user.failed_login_attempts}")
db.commit()
flash('Zbyt wiele nieudanych prob. Konto zablokowane na 30 minut.', 'error')
return render_template('auth/login.html')
db.commit()
flash('Nieprawidlowy email lub haslo.', 'error')
return render_template('auth/login.html')
if not user.is_active:
flash('Konto zostalo dezaktywowane.', 'error')
return render_template('auth/login.html')
# Require email verification
if not user.is_verified:
flash('Musisz potwierdzic adres email przed zalogowaniem. Sprawdz skrzynke.', 'error')
return render_template('auth/login.html')
# Reset failed attempts on successful login
user.failed_login_attempts = 0
user.locked_until = None
# Check if user has 2FA enabled
if user.totp_enabled and SECURITY_SERVICE_AVAILABLE:
# Store pending login in session for 2FA verification
session['2fa_pending_user_id'] = user.id
session['2fa_remember'] = remember
next_page = request.args.get('next')
if next_page and next_page.startswith('/'):
session['2fa_next'] = next_page
db.commit()
logger.info(f"2FA required for user: {email}")
return redirect(url_for('auth.verify_2fa'))
# No 2FA - login directly
login_user(user, remember=remember)
user.last_login = datetime.now()
_auto_link_person(db, user)
# Log successful login to audit
log_audit(db, 'login', 'user', entity_id=user.id, entity_name=user.email,
details={'method': 'password', 'ip': client_ip})
db.commit()
logger.info(f"User logged in: {email}")
next_page = request.args.get('next')
# Prevent open redirect vulnerability
if next_page and not next_page.startswith('/'):
next_page = None
return redirect(next_page or url_for('dashboard'))
except Exception as e:
logger.error(f"Login error: {e}")
flash('Wystapil blad podczas logowania. Sprobuj ponownie.', 'error')
return render_template('auth/login.html')
finally:
db.close()
return render_template('auth/login.html')
# ============================================================
# LOGOUT
# ============================================================
@bp.route('/logout')
@login_required
def logout():
"""User logout"""
# Log logout to audit
db = SessionLocal()
try:
log_audit(db, 'logout', 'user', entity_id=current_user.id,
entity_name=current_user.email)
db.commit()
finally:
db.close()
# Clear 2FA session flag
session.pop('2fa_verified', None)
logout_user()
flash('Wylogowano pomyslnie.', 'success')
return redirect(url_for('index'))
# ============================================================
# TWO-FACTOR AUTHENTICATION
# ============================================================
@bp.route('/verify-2fa', methods=['GET', 'POST'])
@limiter.limit("10 per minute")
def verify_2fa():
"""Verify 2FA code during login"""
# Check if there's a pending 2FA login
pending_user_id = session.get('2fa_pending_user_id')
if not pending_user_id:
flash('Sesja wygasla. Zaloguj sie ponownie.', 'error')
return redirect(url_for('login'))
if request.method == 'POST':
code = request.form.get('code', '').strip()
use_backup = request.form.get('use_backup', False)
if not code:
flash('Wprowadz kod weryfikacyjny.', 'error')
return render_template('auth/verify_2fa.html')
db = SessionLocal()
try:
user = db.query(User).get(pending_user_id)
if not user:
session.pop('2fa_pending_user_id', None)
flash('Uzytkownik nie istnieje.', 'error')
return redirect(url_for('login'))
# Verify code
if SECURITY_SERVICE_AVAILABLE:
if use_backup:
valid = verify_backup_code(user, code, db)
else:
valid = verify_totp(user, code)
else:
valid = False
if valid:
# Clear pending login and log in
session.pop('2fa_pending_user_id', None)
remember = session.pop('2fa_remember', False)
next_page = session.pop('2fa_next', None)
login_user(user, remember=remember)
session['2fa_verified'] = True
user.last_login = datetime.now()
_auto_link_person(db, user)
# Log successful 2FA login to audit
client_ip = request.headers.get('X-Forwarded-For', request.remote_addr)
if client_ip and ',' in client_ip:
client_ip = client_ip.split(',')[0].strip()
log_audit(db, 'login', 'user', entity_id=user.id, entity_name=user.email,
details={'method': '2fa', 'ip': client_ip})
db.commit()
logger.info(f"User logged in with 2FA: {user.email}")
flash('Zalogowano pomyslnie.', 'success')
return redirect(next_page or url_for('dashboard'))
else:
client_ip = request.headers.get('X-Forwarded-For', request.remote_addr)
if client_ip and ',' in client_ip:
client_ip = client_ip.split(',')[0].strip()
security_logger.warning(f"INVALID_2FA ip={client_ip} user_id={pending_user_id}")
flash('Nieprawidlowy kod weryfikacyjny.', 'error')
except Exception as e:
logger.error(f"2FA verification error: {e}")
flash('Wystapil blad. Sprobuj ponownie.', 'error')
finally:
db.close()
return render_template('auth/verify_2fa.html')
@bp.route('/settings/2fa', methods=['GET', 'POST'])
@login_required
def settings_2fa():
"""2FA settings - enable/disable"""
db = SessionLocal()
try:
user = db.query(User).get(current_user.id)
if request.method == 'POST':
action = request.form.get('action')
if action == 'setup':
# Generate new secret
if SECURITY_SERVICE_AVAILABLE:
secret = generate_totp_secret()
if secret:
user.totp_secret = secret
user.totp_enabled = False # Not enabled until verified
db.commit()
qr_uri = get_totp_uri(user)
return render_template('auth/2fa_setup.html',
qr_uri=qr_uri,
secret=secret)
flash('Blad konfiguracji 2FA.', 'error')
elif action == 'verify_setup':
# Verify the setup code and enable 2FA
code = request.form.get('code', '').strip()
if SECURITY_SERVICE_AVAILABLE and verify_totp(user, code):
user.totp_enabled = True
# Generate backup codes
backup_codes = generate_backup_codes(8)
user.totp_backup_codes = backup_codes
db.commit()
# Log audit
if SECURITY_SERVICE_AVAILABLE:
log_audit(db, '2fa.enabled', 'user', user.id, user.email)
db.commit()
logger.info(f"2FA enabled for user: {user.email}")
return render_template('auth/2fa_backup_codes.html',
backup_codes=backup_codes)
else:
flash('Nieprawidlowy kod. Sprobuj ponownie.', 'error')
qr_uri = get_totp_uri(user)
return render_template('auth/2fa_setup.html',
qr_uri=qr_uri,
secret=user.totp_secret)
elif action == 'disable':
# Require current code to disable
code = request.form.get('code', '').strip()
if SECURITY_SERVICE_AVAILABLE and verify_totp(user, code):
user.totp_enabled = False
user.totp_secret = None
user.totp_backup_codes = None
db.commit()
# Log audit
if SECURITY_SERVICE_AVAILABLE:
log_audit(db, '2fa.disabled', 'user', user.id, user.email)
db.commit()
logger.info(f"2FA disabled for user: {user.email}")
flash('Uwierzytelnianie dwuskladnikowe zostalo wylaczone.', 'success')
else:
flash('Nieprawidlowy kod. Nie mozna wylaczyc 2FA.', 'error')
elif action == 'regenerate_backup':
# Require current code to regenerate backup codes
code = request.form.get('code', '').strip()
if SECURITY_SERVICE_AVAILABLE and verify_totp(user, code):
backup_codes = generate_backup_codes(8)
user.totp_backup_codes = backup_codes
db.commit()
logger.info(f"Backup codes regenerated for user: {user.email}")
return render_template('auth/2fa_backup_codes.html',
backup_codes=backup_codes)
else:
flash('Nieprawidlowy kod. Nie mozna wygenerowac kodow.', 'error')
return render_template('auth/2fa_settings.html',
totp_enabled=user.totp_enabled,
backup_codes_count=len(user.totp_backup_codes) if user.totp_backup_codes else 0)
except Exception as e:
logger.error(f"2FA settings error: {e}")
flash('Wystapil blad.', 'error')
return redirect(url_for('dashboard'))
finally:
db.close()
# ============================================================
# PRIVACY SETTINGS
# ============================================================
@bp.route('/settings/privacy', methods=['GET', 'POST'])
@login_required
def settings_privacy():
"""Privacy settings - control visibility of phone and email"""
db = SessionLocal()
try:
user = db.query(User).get(current_user.id)
if request.method == 'POST':
# Update privacy settings
user.privacy_show_phone = request.form.get('show_phone') == 'on'
user.privacy_show_email = request.form.get('show_email') == 'on'
# Update contact preferences
user.contact_prefer_email = request.form.get('prefer_email') == 'on'
user.contact_prefer_phone = request.form.get('prefer_phone') == 'on'
user.contact_prefer_portal = request.form.get('prefer_portal') == 'on'
user.contact_note = request.form.get('contact_note', '').strip() or None
db.commit()
logger.info(f"Privacy settings updated for user: {user.email}")
flash('Ustawienia prywatnosci zostaly zapisane.', 'success')
return redirect(url_for('auth.settings_privacy'))
return render_template('settings/privacy.html',
user=user,
show_phone=user.privacy_show_phone if user.privacy_show_phone is not None else True,
show_email=user.privacy_show_email if user.privacy_show_email is not None else True)
except Exception as e:
logger.error(f"Privacy settings error: {e}")
flash('Wystapil blad.', 'error')
return redirect(url_for('dashboard'))
finally:
db.close()
# ============================================================
# BLOCK SETTINGS
# ============================================================
@bp.route('/settings/blocks', methods=['GET'])
@login_required
def settings_blocks():
"""Manage blocked users"""
db = SessionLocal()
try:
# Get list of blocked users
blocks = db.query(UserBlock).filter(
UserBlock.user_id == current_user.id
).all()
# Get all users for blocking (exclude self and already blocked)
blocked_ids = [b.blocked_user_id for b in blocks]
blocked_ids.append(current_user.id)
available_users = db.query(User).filter(
User.id.notin_(blocked_ids),
User.is_active == True
).order_by(User.name).all()
return render_template('settings/blocks.html',
blocks=blocks,
available_users=available_users)
finally:
db.close()
@bp.route('/settings/blocks/add', methods=['POST'])
@login_required
def settings_blocks_add():
"""Block a user"""
user_id = request.form.get('user_id', type=int)
reason = request.form.get('reason', '').strip()
if not user_id or user_id == current_user.id:
flash('Nieprawidlowy uzytkownik.', 'error')
return redirect(url_for('auth.settings_blocks'))
db = SessionLocal()
try:
# Check if already blocked
existing = db.query(UserBlock).filter(
UserBlock.user_id == current_user.id,
UserBlock.blocked_user_id == user_id
).first()
if existing:
flash('Ten uzytkownik jest juz zablokowany.', 'warning')
return redirect(url_for('auth.settings_blocks'))
block = UserBlock(
user_id=current_user.id,
blocked_user_id=user_id,
reason=reason if reason else None
)
db.add(block)
db.commit()
logger.info(f"User {current_user.id} blocked user {user_id}")
flash('Uzytkownik zostal zablokowany.', 'success')
except Exception as e:
logger.error(f"Error blocking user: {e}")
flash('Wystapil blad.', 'error')
finally:
db.close()
return redirect(url_for('auth.settings_blocks'))
@bp.route('/settings/blocks/remove/<int:block_id>', methods=['POST'])
@login_required
def settings_blocks_remove(block_id):
"""Unblock a user"""
db = SessionLocal()
try:
block = db.query(UserBlock).filter(
UserBlock.id == block_id,
UserBlock.user_id == current_user.id
).first()
if not block:
flash('Blokada nie istnieje.', 'error')
return redirect(url_for('auth.settings_blocks'))
blocked_user_id = block.blocked_user_id
db.delete(block)
db.commit()
logger.info(f"User {current_user.id} unblocked user {blocked_user_id}")
flash('Uzytkownik zostal odblokowany.', 'success')
except Exception as e:
logger.error(f"Error unblocking user: {e}")
flash('Wystapil blad.', 'error')
finally:
db.close()
return redirect(url_for('auth.settings_blocks'))
# ============================================================
# MOJE KONTO - User Account Settings
# ============================================================
@bp.route('/konto')
@login_required
def konto_dane():
"""User profile - edit personal data"""
return render_template('konto/dane.html')
@bp.route('/konto', methods=['POST'])
@login_required
def konto_dane_post():
"""Save user profile changes"""
db = SessionLocal()
try:
user = db.query(User).filter_by(id=current_user.id).first()
if user:
name = sanitize_input(request.form.get('name', ''), 255)
phone = sanitize_input(request.form.get('phone', ''), 50)
user.name = name if name else None
user.phone = phone if phone else None
db.commit()
# Update current_user session
current_user.name = user.name
current_user.phone = user.phone
logger.info(f"Profile updated for user: {user.email}")
flash('Dane zostaly zapisane.', 'success')
except Exception as e:
logger.error(f"Profile update error: {e}")
flash('Wystapil blad podczas zapisywania.', 'error')
finally:
db.close()
return redirect(url_for('auth.konto_dane'))
@bp.route('/konto/avatar', methods=['POST'])
@login_required
def konto_avatar_upload():
"""Upload or replace user avatar photo"""
from file_upload_service import FileUploadService, _BASE_DIR
from PIL import Image
file = request.files.get('avatar')
if not file or file.filename == '':
flash('Nie wybrano pliku.', 'error')
return redirect(url_for('auth.konto_dane'))
# Validate using existing service
is_valid, error = FileUploadService.validate_file(file)
if not is_valid:
flash(error, 'error')
return redirect(url_for('auth.konto_dane'))
db = SessionLocal()
try:
user = db.query(User).filter_by(id=current_user.id).first()
if not user:
flash('Nie znaleziono użytkownika.', 'error')
return redirect(url_for('auth.konto_dane'))
# Generate filename and path
ext = file.filename.rsplit('.', 1)[-1].lower()
if ext == 'jpeg':
ext = 'jpg'
import uuid
stored_filename = f"{uuid.uuid4()}.{ext}"
now = datetime.now()
avatar_dir = os.path.join(_BASE_DIR, 'static', 'uploads', 'avatars', str(now.year), f"{now.month:02d}")
os.makedirs(avatar_dir, exist_ok=True)
file_path = os.path.join(avatar_dir, stored_filename)
# Resize to 300x300 square crop, strip EXIF
img = Image.open(file)
if img.mode in ('RGBA', 'LA', 'P') and ext == 'jpg':
img = img.convert('RGB')
elif img.mode not in ('RGB', 'RGBA'):
img = img.convert('RGB')
# Center crop to square
w, h = img.size
side = min(w, h)
left = (w - side) // 2
top = (h - side) // 2
img = img.crop((left, top, left + side, top + side))
img = img.resize((300, 300), Image.LANCZOS)
# Save
save_kwargs = {'optimize': True}
if ext == 'jpg':
save_kwargs['quality'] = 85
img.save(file_path, **save_kwargs)
# Delete old avatar if exists
if user.avatar_path:
old_path = os.path.join(_BASE_DIR, 'static', user.avatar_path)
if os.path.exists(old_path):
try:
os.remove(old_path)
except OSError:
pass
# Save relative path
user.avatar_path = os.path.relpath(file_path, os.path.join(_BASE_DIR, 'static'))
db.commit()
current_user.avatar_path = user.avatar_path
logger.info(f"Avatar uploaded for user {user.id}: {user.avatar_path}")
flash('Zdjęcie profilowe zostało zapisane.', 'success')
except Exception as e:
db.rollback()
logger.error(f"Avatar upload error: {e}")
flash('Wystąpił błąd podczas zapisywania zdjęcia.', 'error')
finally:
db.close()
return redirect(url_for('auth.konto_dane'))
@bp.route('/konto/avatar/delete', methods=['POST'])
@login_required
def konto_avatar_delete():
"""Delete user avatar photo"""
from file_upload_service import _BASE_DIR
db = SessionLocal()
try:
user = db.query(User).filter_by(id=current_user.id).first()
if user and user.avatar_path:
# Delete file
old_path = os.path.join(_BASE_DIR, 'static', user.avatar_path)
if os.path.exists(old_path):
try:
os.remove(old_path)
except OSError:
pass
user.avatar_path = None
db.commit()
current_user.avatar_path = None
logger.info(f"Avatar deleted for user {user.id}")
flash('Zdjęcie profilowe zostało usunięte.', 'success')
except Exception as e:
db.rollback()
logger.error(f"Avatar delete error: {e}")
flash('Wystąpił błąd podczas usuwania zdjęcia.', 'error')
finally:
db.close()
return redirect(url_for('auth.konto_dane'))
@bp.route('/konto/prywatnosc', methods=['GET', 'POST'])
@login_required
def konto_prywatnosc():
"""Privacy settings - control visibility of phone and email"""
db = SessionLocal()
try:
user = db.query(User).filter_by(id=current_user.id).first()
if request.method == 'POST':
user.privacy_show_phone = request.form.get('show_phone') == 'on'
user.privacy_show_email = request.form.get('show_email') == 'on'
user.contact_prefer_email = request.form.get('prefer_email') == 'on'
user.contact_prefer_phone = request.form.get('prefer_phone') == 'on'
user.contact_prefer_portal = request.form.get('prefer_portal') == 'on'
user.notify_email_messages = request.form.get('notify_email_messages') == 'on'
db.commit()
logger.info(f"Privacy settings updated for user: {user.email}")
flash('Ustawienia prywatności zostały zapisane.', 'success')
return redirect(url_for('auth.konto_prywatnosc'))
return render_template('konto/prywatnosc.html',
user=user,
show_phone=user.privacy_show_phone if user.privacy_show_phone is not None else True,
show_email=user.privacy_show_email if user.privacy_show_email is not None else True)
except Exception as e:
logger.error(f"Privacy settings error: {e}")
flash('Wystapil blad.', 'error')
return redirect(url_for('auth.konto_dane'))
finally:
db.close()
@bp.route('/konto/bezpieczenstwo')
@login_required
def konto_bezpieczenstwo():
"""Security settings - 2FA, password"""
return render_template('konto/bezpieczenstwo.html')
@bp.route('/konto/blokady')
@login_required
def konto_blokady():
"""User blocks management"""
db = SessionLocal()
try:
blocks = db.query(UserBlock).filter_by(user_id=current_user.id).all()
blocked_ids = [b.blocked_user_id for b in blocks]
blocked_ids.append(current_user.id)
available_users = db.query(User).filter(
User.id.notin_(blocked_ids),
User.is_active == True
).order_by(User.name).all()
return render_template('konto/blokady.html',
blocks=blocks,
available_users=available_users)
except Exception as e:
logger.error(f"Blocks page error: {e}")
flash('Wystapil blad.', 'error')
return redirect(url_for('auth.konto_dane'))
finally:
db.close()
@bp.route('/konto/blokady/dodaj', methods=['POST'])
@login_required
def konto_blokady_dodaj():
"""Block a user"""
db = SessionLocal()
try:
user_id = request.form.get('user_id', type=int)
if not user_id or user_id == current_user.id:
flash('Nieprawidlowy uzytkownik.', 'error')
return redirect(url_for('auth.konto_blokady'))
existing = db.query(UserBlock).filter_by(
user_id=current_user.id,
blocked_user_id=user_id
).first()
if existing:
flash('Ten uzytkownik jest juz zablokowany.', 'info')
return redirect(url_for('auth.konto_blokady'))
block = UserBlock(user_id=current_user.id, blocked_user_id=user_id)
db.add(block)
db.commit()
logger.info(f"User {current_user.id} blocked user {user_id}")
flash('Uzytkownik zostal zablokowany.', 'success')
except Exception as e:
logger.error(f"Error blocking user: {e}")
flash('Wystapil blad.', 'error')
finally:
db.close()
return redirect(url_for('auth.konto_blokady'))
@bp.route('/konto/integracje')
@login_required
def konto_integracje():
"""OAuth integrations page for MANAGER+.
Allows connecting Google/Meta accounts for enriched audit data.
"""
if not current_user.has_role(SystemRole.MANAGER):
flash('Ta strona wymaga uprawnień kadry zarządzającej.', 'error')
return redirect(url_for('auth.konto_dane'))
if not current_user.company_id:
flash('Musisz byc przypisany do firmy, aby korzystac z integracji.', 'info')
return redirect(url_for('auth.konto_dane'))
db = SessionLocal()
try:
company = db.query(Company).filter(Company.id == current_user.company_id).first()
if not company:
flash('Firma nie istnieje.', 'error')
return redirect(url_for('auth.konto_dane'))
from oauth_service import OAuthService
oauth = OAuthService()
connections = oauth.get_connected_services(db, current_user.company_id)
oauth_available = {
'google': bool(oauth.google_client_id),
'meta': bool(oauth.meta_app_id),
}
return render_template(
'admin/company_settings.html',
company=company,
connections=connections,
oauth_available=oauth_available,
is_user_view=True,
)
finally:
db.close()
@bp.route('/konto/blokady/usun/<int:block_id>', methods=['POST'])
@login_required
def konto_blokady_usun(block_id):
"""Unblock a user"""
db = SessionLocal()
try:
block = db.query(UserBlock).filter(
UserBlock.id == block_id,
UserBlock.user_id == current_user.id
).first()
if not block:
flash('Blokada nie istnieje.', 'error')
return redirect(url_for('auth.konto_blokady'))
blocked_user_id = block.blocked_user_id
db.delete(block)
db.commit()
logger.info(f"User {current_user.id} unblocked user {blocked_user_id}")
flash('Uzytkownik zostal odblokowany.', 'success')
except Exception as e:
logger.error(f"Error unblocking user: {e}")
flash('Wystapil blad.', 'error')
finally:
db.close()
return redirect(url_for('auth.konto_blokady'))
# ============================================================
# PASSWORD RESET
# ============================================================
@bp.route('/forgot-password', methods=['GET', 'POST'])
@limiter.limit("20 per hour")
def forgot_password():
"""Request password reset"""
if current_user.is_authenticated:
return redirect(url_for('index'))
if request.method == 'POST':
email = sanitize_input(request.form.get('email', ''), 255)
if not validate_email(email):
flash('Nieprawidlowy format adresu email.', 'error')
return render_template('auth/forgot_password.html')
db = SessionLocal()
try:
user = db.query(User).filter_by(email=email, is_active=True).first()
if user:
# Generate reset token
reset_token = secrets.token_urlsafe(32)
reset_expires = datetime.now() + timedelta(hours=1)
# Save token to database
user.reset_token = reset_token
user.reset_token_expires = reset_expires
db.commit()
# Build reset URL
base_url = os.getenv('APP_URL', 'https://nordabiznes.pl')
reset_url = f"{base_url}/reset-password/{reset_token}"
# Try to send email
try:
import email_service
if email_service.is_configured():
success = email_service.send_password_reset_email(email, reset_url)
if success:
logger.info(f"Password reset email sent to {email}")
else:
logger.warning(f"Failed to send password reset email to {email}")
logger.info(f"Reset token (email failed) for {email}: {reset_token[:8]}...")
else:
logger.warning("Email service not configured")
logger.info(f"Reset token (no email) for {email}: {reset_token[:8]}...")
except Exception as e:
logger.error(f"Error sending reset email: {e}")
logger.info(f"Reset token (exception) for {email}: {reset_token[:8]}...")
# Always show same message to prevent email enumeration
flash('Jesli email istnieje w systemie, instrukcje resetowania hasla zostaly wyslane.', 'info')
return redirect(url_for('login'))
except Exception as e:
logger.error(f"Password reset error: {e}")
flash('Wystapil blad. Sprobuj ponownie.', 'error')
finally:
db.close()
return render_template('auth/forgot_password.html')
@bp.route('/reset-password/<token>', methods=['GET', 'POST'])
@limiter.limit("30 per hour")
def reset_password(token):
"""Reset password with token"""
if current_user.is_authenticated:
return redirect(url_for('index'))
db = SessionLocal()
try:
# Find user with valid token
user = db.query(User).filter(
User.reset_token == token,
User.reset_token_expires > datetime.now(),
User.is_active == True
).first()
if not user:
flash('Link resetowania hasla jest nieprawidlowy lub wygasl.', 'error')
return redirect(url_for('auth.forgot_password'))
if request.method == 'POST':
password = request.form.get('password', '')
password_confirm = request.form.get('password_confirm', '')
# Validate passwords match
if password != password_confirm:
flash('Hasla nie sa identyczne.', 'error')
return render_template('auth/reset_password.html', token=token)
# Validate password strength
password_valid, password_message = validate_password(password)
if not password_valid:
flash(password_message, 'error')
return render_template('auth/reset_password.html', token=token)
# Update password and clear reset token
user.password_hash = generate_password_hash(password, method='pbkdf2:sha256')
user.reset_token = None
user.reset_token_expires = None
# Auto-verify email - user proved inbox access by using reset link
if not user.is_verified:
user.is_verified = True
user.verified_at = datetime.now()
user.verification_token = None
user.verification_token_expires = None
logger.info(f"Email auto-verified via password reset for {user.email}")
# Notify admin if this is a first-time activation (user never logged in)
first_activation = user.last_login is None
db.commit()
logger.info(f"Password reset successful for {user.email}")
if first_activation:
try:
import email_service
email_service.send_email(
to=['maciej.pienczyn@inpi.pl'],
subject=f'Aktywacja konta: {user.name}',
body_text=f'{user.name} ({user.email}) właśnie ustawił(a) hasło i aktywował(a) swoje konto w portalu Norda Biznes Partner.',
email_type='admin_notification',
bcc=[]
)
except Exception as notify_err:
logger.warning(f"Failed to send activation notification: {notify_err}")
flash('Haslo zostalo zmienione. Mozesz sie teraz zalogowac.', 'success')
return redirect(url_for('login'))
return render_template('auth/reset_password.html', token=token)
except Exception as e:
logger.error(f"Reset password error: {e}")
flash('Wystapil blad. Sprobuj ponownie.', 'error')
return redirect(url_for('auth.forgot_password'))
finally:
db.close()
# ============================================================
# EMAIL VERIFICATION
# ============================================================
@bp.route('/verify-email/<token>')
def verify_email(token):
"""Verify email address with token"""
db = SessionLocal()
try:
user = db.query(User).filter(
User.verification_token == token,
User.verification_token_expires > datetime.now(),
User.is_active == True
).first()
if not user:
flash('Link weryfikacyjny jest nieprawidlowy lub wygasl.', 'error')
return redirect(url_for('login'))
if user.is_verified:
# Already verified - auto-login if not logged in
if not current_user.is_authenticated:
login_user(user, remember=True)
user.last_login = datetime.now()
db.commit()
flash('Witamy ponownie! Zostales automatycznie zalogowany.', 'info')
return redirect(url_for('dashboard'))
flash('Email zostal juz zweryfikowany.', 'info')
return redirect(url_for('dashboard'))
# Verify user (keep token until natural expiry so double-clicks
# and email client prefetch don't show a confusing error)
user.is_verified = True
user.verified_at = datetime.now()
# Auto-login the user after verification
login_user(user, remember=True)
user.last_login = datetime.now()
# Log successful verification and login
client_ip = request.headers.get('X-Forwarded-For', request.remote_addr)
if client_ip and ',' in client_ip:
client_ip = client_ip.split(',')[0].strip()
log_audit(db, 'email_verified', 'user', entity_id=user.id, entity_name=user.email,
details={'ip': client_ip})
log_audit(db, 'login', 'user', entity_id=user.id, entity_name=user.email,
details={'method': 'email_verification', 'ip': client_ip})
db.commit()
logger.info(f"Email verified and auto-logged in: {user.email}")
flash('Email zweryfikowany! Witamy w Norda Biznes Partner.', 'success')
return redirect(url_for('dashboard'))
except Exception as e:
logger.error(f"Email verification error: {e}")
flash('Wystapil blad podczas weryfikacji.', 'error')
return redirect(url_for('login'))
finally:
db.close()
@bp.route('/resend-verification', methods=['GET', 'POST'])
@limiter.limit("15 per hour")
def resend_verification():
"""Resend email verification link"""
if current_user.is_authenticated:
return redirect(url_for('index'))
if request.method == 'POST':
email = sanitize_input(request.form.get('email', ''), 255)
if not validate_email(email):
flash('Nieprawidlowy format adresu email.', 'error')
return render_template('auth/resend_verification.html')
db = SessionLocal()
try:
user = db.query(User).filter_by(email=email, is_active=True).first()
if user and not user.is_verified:
# Generate new verification token
verification_token = secrets.token_urlsafe(32)
verification_expires = datetime.now() + timedelta(hours=24)
# Update user token
user.verification_token = verification_token
user.verification_token_expires = verification_expires
db.commit()
# Build verification URL
base_url = os.getenv('APP_URL', 'https://nordabiznes.pl')
verification_url = f"{base_url}/verify-email/{verification_token}"
# Try to send email
try:
import email_service
if email_service.is_configured():
success = email_service.send_welcome_email(email, user.name, verification_url)
if success:
logger.info(f"Verification email resent to {email}")
else:
logger.warning(f"Failed to resend verification email to {email}")
logger.info(f"Resend verification token (email failed) for {email}: {verification_token[:8]}...")
else:
logger.warning("Email service not configured")
logger.info(f"Resend verification token (no email) for {email}: {verification_token[:8]}...")
except Exception as e:
logger.error(f"Error resending verification email: {e}")
logger.info(f"Resend verification token (exception) for {email}: {verification_token[:8]}...")
# Always show same message to prevent email enumeration
flash('Jesli konto istnieje i nie zostalo zweryfikowane, email weryfikacyjny zostal wyslany.', 'info')
return redirect(url_for('login'))
except Exception as e:
logger.error(f"Resend verification error: {e}")
flash('Wystapil blad. Sprobuj ponownie.', 'error')
finally:
db.close()
return render_template('auth/resend_verification.html')