nordabiz/blueprints/auth/routes.py
Maciej Pienczyn d656f75f35
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
fix: Use correct field name is_verified instead of email_verified
2026-02-02 13:19:54 +01:00

1129 lines
42 KiB
Python

"""
Auth Routes
============
Authentication routes: login, logout, register, password reset, email verification, 2FA.
"""
import os
import re
import logging
import secrets
from datetime import datetime, timedelta
from flask import render_template, request, redirect, url_for, flash, session, current_app
from flask_login import login_required, login_user, logout_user, current_user
from werkzeug.security import generate_password_hash, check_password_hash
from . import bp
from database import SessionLocal, User, Company, UserBlock
from utils.helpers import sanitize_input, validate_email, validate_password
from extensions import limiter
from security_service import log_audit
# Logger
logger = logging.getLogger(__name__)
# Security logger (same name as in app.py for fail2ban integration)
security_logger = logging.getLogger('security')
# Security service availability check
try:
from security_service import (
generate_totp_secret, get_totp_uri, verify_totp,
generate_backup_codes, verify_backup_code, log_audit
)
SECURITY_SERVICE_AVAILABLE = True
except ImportError:
SECURITY_SERVICE_AVAILABLE = False
logger.warning("Security service not available in auth blueprint")
def _send_registration_notification(user_info):
"""Send email notification when a new user registers"""
try:
from email_service import send_email, is_configured
if not is_configured():
logger.warning("Email service not configured - skipping registration notification")
return
notify_email = os.getenv('ERROR_NOTIFY_EMAIL', 'maciej.pienczyn@inpi.pl')
if not notify_email:
return
reg_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
is_member = "TAK" if user_info.get('is_norda_member') else "NIE"
company_name = user_info.get('company_name', 'Brak przypisanej firmy')
subject = f"NordaBiz: Nowa rejestracja - {user_info.get('name', 'Nieznany')}"
body_text = f"""NOWA REJESTRACJA NA NORDABIZNES.PL
{'='*50}
Czas: {reg_time}
Imie: {user_info.get('name', 'N/A')}
Email: {user_info.get('email', 'N/A')}
NIP: {user_info.get('company_nip', 'N/A')}
Firma: {company_name}
Czlonek NORDA: {is_member}
{'='*50}
"""
send_email(
to_email=notify_email,
subject=subject,
body_text=body_text
)
logger.info(f"Registration notification sent for {user_info.get('email')}")
except Exception as e:
logger.error(f"Failed to send registration notification: {e}")
# ============================================================
# REGISTRATION
# ============================================================
@bp.route('/register', methods=['GET', 'POST'])
@limiter.limit("50 per hour;200 per day")
def register():
"""User registration"""
if current_user.is_authenticated:
return redirect(url_for('index'))
if request.method == 'POST':
email = sanitize_input(request.form.get('email', ''), 255)
password = request.form.get('password', '')
name = sanitize_input(request.form.get('name', ''), 255)
company_nip = sanitize_input(request.form.get('company_nip', ''), 10)
# Validate email
if not validate_email(email):
flash('Nieprawidlowy format adresu email.', 'error')
return render_template('auth/register.html')
# Validate password
password_valid, password_message = validate_password(password)
if not password_valid:
flash(password_message, 'error')
return render_template('auth/register.html')
# Validate required fields
if not name or not email or not company_nip:
flash('Imie, email i NIP firmy sa wymagane.', 'error')
return render_template('auth/register.html')
# Validate NIP format
if not re.match(r'^\d{10}$', company_nip):
flash('NIP musi skladac sie z 10 cyfr.', 'error')
return render_template('auth/register.html')
db = SessionLocal()
try:
# Check if user exists
if db.query(User).filter_by(email=email).first():
flash('Email juz jest zarejestrowany.', 'error')
return render_template('auth/register.html')
# Check if company is NORDA member
is_norda_member = False
company_id = None
if company_nip and re.match(r'^\d{10}$', company_nip):
company = db.query(Company).filter_by(nip=company_nip, status='active').first()
if company:
is_norda_member = True
company_id = company.id
# Generate verification token
verification_token = secrets.token_urlsafe(32)
verification_expires = datetime.now() + timedelta(hours=24)
# Create user
user = User(
email=email,
password_hash=generate_password_hash(password, method='pbkdf2:sha256'),
name=name,
company_nip=company_nip,
company_id=company_id,
is_norda_member=is_norda_member,
created_at=datetime.now(),
is_active=True,
is_verified=False,
verification_token=verification_token,
verification_token_expires=verification_expires
)
db.add(user)
db.commit()
# Build verification URL
base_url = os.getenv('APP_URL', 'https://nordabiznes.pl')
verification_url = f"{base_url}/verify-email/{verification_token}"
# Try to send verification email
try:
import email_service
if email_service.is_configured():
success = email_service.send_welcome_email(email, name, verification_url)
if success:
logger.info(f"Verification email sent to {email}")
else:
logger.warning(f"Failed to send verification email to {email}")
logger.info(f"Verification token (email failed) for {email}: {verification_token[:8]}...")
else:
logger.warning("Email service not configured")
logger.info(f"Verification token (no email) for {email}: {verification_token[:8]}...")
except Exception as e:
logger.error(f"Error sending verification email: {e}")
logger.info(f"Verification token (exception) for {email}: {verification_token[:8]}...")
logger.info(f"New user registered: {email}")
# Send notification to admin about new registration
try:
company_name = company.name if company_id and company else None
_send_registration_notification({
'name': name,
'email': email,
'company_nip': company_nip,
'company_name': company_name,
'is_norda_member': is_norda_member
})
except Exception as e:
logger.error(f"Failed to send registration notification: {e}")
# Store email in session for success page
session['registered_email'] = email
return redirect(url_for('auth.registration_success'))
except Exception as e:
logger.error(f"Registration error: {e}")
flash('Wystapil blad podczas rejestracji. Sprobuj ponownie.', 'error')
return render_template('auth/register.html')
finally:
db.close()
return render_template('auth/register.html')
# ============================================================
# REGISTRATION SUCCESS
# ============================================================
@bp.route('/registration-success')
def registration_success():
"""Show registration success page with email verification instructions."""
# Get email from session (set during registration)
email = session.pop('registered_email', None)
if not email:
# No email in session - redirect to register
return redirect(url_for('auth.register'))
return render_template('auth/registration_success.html', email=email)
@bp.route('/check-verification-status')
def check_verification_status():
"""API endpoint to check if email has been verified (for polling from success page)."""
from flask import jsonify
email = request.args.get('email', '')
if not email:
return jsonify({'verified': False, 'error': 'No email provided'}), 400
db = SessionLocal()
try:
user = db.query(User).filter(User.email == email).first()
if not user:
return jsonify({'verified': False, 'error': 'User not found'}), 404
if user.is_verified:
return jsonify({'verified': True, 'redirect': url_for('dashboard')})
return jsonify({'verified': False})
finally:
db.close()
# ============================================================
# LOGIN
# ============================================================
@bp.route('/login', methods=['GET', 'POST'])
@limiter.limit("1000 per hour" if os.getenv('FLASK_ENV') == 'development' else "60 per minute;500 per hour")
def login():
"""User login"""
if current_user.is_authenticated:
return redirect(url_for('index'))
if request.method == 'POST':
email = sanitize_input(request.form.get('email', ''), 255)
password = request.form.get('password', '')
remember = request.form.get('remember', False) == 'on'
# Basic validation
if not email or not password:
flash('Email i haslo sa wymagane.', 'error')
return render_template('auth/login.html')
db = SessionLocal()
try:
user = db.query(User).filter_by(email=email).first()
# Get client IP for logging
client_ip = request.headers.get('X-Forwarded-For', request.remote_addr)
if client_ip and ',' in client_ip:
client_ip = client_ip.split(',')[0].strip()
# Check if account is locked
if user and user.locked_until and user.locked_until > datetime.now():
remaining = (user.locked_until - datetime.now()).seconds // 60 + 1
security_logger.warning(f"LOCKED_ACCOUNT ip={client_ip} email={email}")
flash(f'Konto tymczasowo zablokowane. Sprobuj za {remaining} minut.', 'error')
return render_template('auth/login.html')
if not user or not check_password_hash(user.password_hash, password):
logger.warning(f"Failed login attempt for: {email}")
security_logger.warning(f"FAILED_LOGIN ip={client_ip} email={email}")
# Log failed login to audit
log_audit(db, 'login_failed', 'user', entity_name=email,
details={'reason': 'invalid_credentials', 'ip': client_ip})
db.commit()
# Increment failed attempts if user exists
if user:
user.failed_login_attempts = (user.failed_login_attempts or 0) + 1
# Lock account after 5 failed attempts (30 min lockout)
if user.failed_login_attempts >= 5:
user.locked_until = datetime.now() + timedelta(minutes=30)
security_logger.warning(f"ACCOUNT_LOCKED ip={client_ip} email={email} attempts={user.failed_login_attempts}")
db.commit()
flash('Zbyt wiele nieudanych prob. Konto zablokowane na 30 minut.', 'error')
return render_template('auth/login.html')
db.commit()
flash('Nieprawidlowy email lub haslo.', 'error')
return render_template('auth/login.html')
if not user.is_active:
flash('Konto zostalo dezaktywowane.', 'error')
return render_template('auth/login.html')
# Require email verification
if not user.is_verified:
flash('Musisz potwierdzic adres email przed zalogowaniem. Sprawdz skrzynke.', 'error')
return render_template('auth/login.html')
# Reset failed attempts on successful login
user.failed_login_attempts = 0
user.locked_until = None
# Check if user has 2FA enabled
if user.totp_enabled and SECURITY_SERVICE_AVAILABLE:
# Store pending login in session for 2FA verification
session['2fa_pending_user_id'] = user.id
session['2fa_remember'] = remember
next_page = request.args.get('next')
if next_page and next_page.startswith('/'):
session['2fa_next'] = next_page
db.commit()
logger.info(f"2FA required for user: {email}")
return redirect(url_for('auth.verify_2fa'))
# No 2FA - login directly
login_user(user, remember=remember)
user.last_login = datetime.now()
# Log successful login to audit
log_audit(db, 'login', 'user', entity_id=user.id, entity_name=user.email,
details={'method': 'password', 'ip': client_ip})
db.commit()
logger.info(f"User logged in: {email}")
next_page = request.args.get('next')
# Prevent open redirect vulnerability
if next_page and not next_page.startswith('/'):
next_page = None
return redirect(next_page or url_for('dashboard'))
except Exception as e:
logger.error(f"Login error: {e}")
flash('Wystapil blad podczas logowania. Sprobuj ponownie.', 'error')
return render_template('auth/login.html')
finally:
db.close()
return render_template('auth/login.html')
# ============================================================
# LOGOUT
# ============================================================
@bp.route('/logout')
@login_required
def logout():
"""User logout"""
# Log logout to audit
db = SessionLocal()
try:
log_audit(db, 'logout', 'user', entity_id=current_user.id,
entity_name=current_user.email)
db.commit()
finally:
db.close()
# Clear 2FA session flag
session.pop('2fa_verified', None)
logout_user()
flash('Wylogowano pomyslnie.', 'success')
return redirect(url_for('index'))
# ============================================================
# TWO-FACTOR AUTHENTICATION
# ============================================================
@bp.route('/verify-2fa', methods=['GET', 'POST'])
@limiter.limit("10 per minute")
def verify_2fa():
"""Verify 2FA code during login"""
# Check if there's a pending 2FA login
pending_user_id = session.get('2fa_pending_user_id')
if not pending_user_id:
flash('Sesja wygasla. Zaloguj sie ponownie.', 'error')
return redirect(url_for('login'))
if request.method == 'POST':
code = request.form.get('code', '').strip()
use_backup = request.form.get('use_backup', False)
if not code:
flash('Wprowadz kod weryfikacyjny.', 'error')
return render_template('auth/verify_2fa.html')
db = SessionLocal()
try:
user = db.query(User).get(pending_user_id)
if not user:
session.pop('2fa_pending_user_id', None)
flash('Uzytkownik nie istnieje.', 'error')
return redirect(url_for('login'))
# Verify code
if SECURITY_SERVICE_AVAILABLE:
if use_backup:
valid = verify_backup_code(user, code, db)
else:
valid = verify_totp(user, code)
else:
valid = False
if valid:
# Clear pending login and log in
session.pop('2fa_pending_user_id', None)
remember = session.pop('2fa_remember', False)
next_page = session.pop('2fa_next', None)
login_user(user, remember=remember)
session['2fa_verified'] = True
user.last_login = datetime.now()
# Log successful 2FA login to audit
client_ip = request.headers.get('X-Forwarded-For', request.remote_addr)
if client_ip and ',' in client_ip:
client_ip = client_ip.split(',')[0].strip()
log_audit(db, 'login', 'user', entity_id=user.id, entity_name=user.email,
details={'method': '2fa', 'ip': client_ip})
db.commit()
logger.info(f"User logged in with 2FA: {user.email}")
flash('Zalogowano pomyslnie.', 'success')
return redirect(next_page or url_for('dashboard'))
else:
client_ip = request.headers.get('X-Forwarded-For', request.remote_addr)
if client_ip and ',' in client_ip:
client_ip = client_ip.split(',')[0].strip()
security_logger.warning(f"INVALID_2FA ip={client_ip} user_id={pending_user_id}")
flash('Nieprawidlowy kod weryfikacyjny.', 'error')
except Exception as e:
logger.error(f"2FA verification error: {e}")
flash('Wystapil blad. Sprobuj ponownie.', 'error')
finally:
db.close()
return render_template('auth/verify_2fa.html')
@bp.route('/settings/2fa', methods=['GET', 'POST'])
@login_required
def settings_2fa():
"""2FA settings - enable/disable"""
db = SessionLocal()
try:
user = db.query(User).get(current_user.id)
if request.method == 'POST':
action = request.form.get('action')
if action == 'setup':
# Generate new secret
if SECURITY_SERVICE_AVAILABLE:
secret = generate_totp_secret()
if secret:
user.totp_secret = secret
user.totp_enabled = False # Not enabled until verified
db.commit()
qr_uri = get_totp_uri(user)
return render_template('auth/2fa_setup.html',
qr_uri=qr_uri,
secret=secret)
flash('Blad konfiguracji 2FA.', 'error')
elif action == 'verify_setup':
# Verify the setup code and enable 2FA
code = request.form.get('code', '').strip()
if SECURITY_SERVICE_AVAILABLE and verify_totp(user, code):
user.totp_enabled = True
# Generate backup codes
backup_codes = generate_backup_codes(8)
user.totp_backup_codes = backup_codes
db.commit()
# Log audit
if SECURITY_SERVICE_AVAILABLE:
log_audit(db, '2fa.enabled', 'user', user.id, user.email)
db.commit()
logger.info(f"2FA enabled for user: {user.email}")
return render_template('auth/2fa_backup_codes.html',
backup_codes=backup_codes)
else:
flash('Nieprawidlowy kod. Sprobuj ponownie.', 'error')
qr_uri = get_totp_uri(user)
return render_template('auth/2fa_setup.html',
qr_uri=qr_uri,
secret=user.totp_secret)
elif action == 'disable':
# Require current code to disable
code = request.form.get('code', '').strip()
if SECURITY_SERVICE_AVAILABLE and verify_totp(user, code):
user.totp_enabled = False
user.totp_secret = None
user.totp_backup_codes = None
db.commit()
# Log audit
if SECURITY_SERVICE_AVAILABLE:
log_audit(db, '2fa.disabled', 'user', user.id, user.email)
db.commit()
logger.info(f"2FA disabled for user: {user.email}")
flash('Uwierzytelnianie dwuskladnikowe zostalo wylaczone.', 'success')
else:
flash('Nieprawidlowy kod. Nie mozna wylaczyc 2FA.', 'error')
elif action == 'regenerate_backup':
# Require current code to regenerate backup codes
code = request.form.get('code', '').strip()
if SECURITY_SERVICE_AVAILABLE and verify_totp(user, code):
backup_codes = generate_backup_codes(8)
user.totp_backup_codes = backup_codes
db.commit()
logger.info(f"Backup codes regenerated for user: {user.email}")
return render_template('auth/2fa_backup_codes.html',
backup_codes=backup_codes)
else:
flash('Nieprawidlowy kod. Nie mozna wygenerowac kodow.', 'error')
return render_template('auth/2fa_settings.html',
totp_enabled=user.totp_enabled,
backup_codes_count=len(user.totp_backup_codes) if user.totp_backup_codes else 0)
except Exception as e:
logger.error(f"2FA settings error: {e}")
flash('Wystapil blad.', 'error')
return redirect(url_for('dashboard'))
finally:
db.close()
# ============================================================
# PRIVACY SETTINGS
# ============================================================
@bp.route('/settings/privacy', methods=['GET', 'POST'])
@login_required
def settings_privacy():
"""Privacy settings - control visibility of phone and email"""
db = SessionLocal()
try:
user = db.query(User).get(current_user.id)
if request.method == 'POST':
# Update privacy settings
user.privacy_show_phone = request.form.get('show_phone') == 'on'
user.privacy_show_email = request.form.get('show_email') == 'on'
# Update contact preferences
user.contact_prefer_email = request.form.get('prefer_email') == 'on'
user.contact_prefer_phone = request.form.get('prefer_phone') == 'on'
user.contact_prefer_portal = request.form.get('prefer_portal') == 'on'
user.contact_note = request.form.get('contact_note', '').strip() or None
db.commit()
logger.info(f"Privacy settings updated for user: {user.email}")
flash('Ustawienia prywatnosci zostaly zapisane.', 'success')
return redirect(url_for('auth.settings_privacy'))
return render_template('settings/privacy.html',
user=user,
show_phone=user.privacy_show_phone if user.privacy_show_phone is not None else True,
show_email=user.privacy_show_email if user.privacy_show_email is not None else True)
except Exception as e:
logger.error(f"Privacy settings error: {e}")
flash('Wystapil blad.', 'error')
return redirect(url_for('dashboard'))
finally:
db.close()
# ============================================================
# BLOCK SETTINGS
# ============================================================
@bp.route('/settings/blocks', methods=['GET'])
@login_required
def settings_blocks():
"""Manage blocked users"""
db = SessionLocal()
try:
# Get list of blocked users
blocks = db.query(UserBlock).filter(
UserBlock.user_id == current_user.id
).all()
# Get all users for blocking (exclude self and already blocked)
blocked_ids = [b.blocked_user_id for b in blocks]
blocked_ids.append(current_user.id)
available_users = db.query(User).filter(
User.id.notin_(blocked_ids),
User.is_active == True
).order_by(User.name).all()
return render_template('settings/blocks.html',
blocks=blocks,
available_users=available_users)
finally:
db.close()
@bp.route('/settings/blocks/add', methods=['POST'])
@login_required
def settings_blocks_add():
"""Block a user"""
user_id = request.form.get('user_id', type=int)
reason = request.form.get('reason', '').strip()
if not user_id or user_id == current_user.id:
flash('Nieprawidlowy uzytkownik.', 'error')
return redirect(url_for('auth.settings_blocks'))
db = SessionLocal()
try:
# Check if already blocked
existing = db.query(UserBlock).filter(
UserBlock.user_id == current_user.id,
UserBlock.blocked_user_id == user_id
).first()
if existing:
flash('Ten uzytkownik jest juz zablokowany.', 'warning')
return redirect(url_for('auth.settings_blocks'))
block = UserBlock(
user_id=current_user.id,
blocked_user_id=user_id,
reason=reason if reason else None
)
db.add(block)
db.commit()
logger.info(f"User {current_user.id} blocked user {user_id}")
flash('Uzytkownik zostal zablokowany.', 'success')
except Exception as e:
logger.error(f"Error blocking user: {e}")
flash('Wystapil blad.', 'error')
finally:
db.close()
return redirect(url_for('auth.settings_blocks'))
@bp.route('/settings/blocks/remove/<int:block_id>', methods=['POST'])
@login_required
def settings_blocks_remove(block_id):
"""Unblock a user"""
db = SessionLocal()
try:
block = db.query(UserBlock).filter(
UserBlock.id == block_id,
UserBlock.user_id == current_user.id
).first()
if not block:
flash('Blokada nie istnieje.', 'error')
return redirect(url_for('auth.settings_blocks'))
blocked_user_id = block.blocked_user_id
db.delete(block)
db.commit()
logger.info(f"User {current_user.id} unblocked user {blocked_user_id}")
flash('Uzytkownik zostal odblokowany.', 'success')
except Exception as e:
logger.error(f"Error unblocking user: {e}")
flash('Wystapil blad.', 'error')
finally:
db.close()
return redirect(url_for('auth.settings_blocks'))
# ============================================================
# MOJE KONTO - User Account Settings
# ============================================================
@bp.route('/konto')
@login_required
def konto_dane():
"""User profile - edit personal data"""
return render_template('konto/dane.html')
@bp.route('/konto', methods=['POST'])
@login_required
def konto_dane_post():
"""Save user profile changes"""
db = SessionLocal()
try:
user = db.query(User).filter_by(id=current_user.id).first()
if user:
name = sanitize_input(request.form.get('name', ''), 255)
phone = sanitize_input(request.form.get('phone', ''), 50)
user.name = name if name else None
user.phone = phone if phone else None
db.commit()
# Update current_user session
current_user.name = user.name
current_user.phone = user.phone
logger.info(f"Profile updated for user: {user.email}")
flash('Dane zostaly zapisane.', 'success')
except Exception as e:
logger.error(f"Profile update error: {e}")
flash('Wystapil blad podczas zapisywania.', 'error')
finally:
db.close()
return redirect(url_for('auth.konto_dane'))
@bp.route('/konto/prywatnosc', methods=['GET', 'POST'])
@login_required
def konto_prywatnosc():
"""Privacy settings - control visibility of phone and email"""
db = SessionLocal()
try:
user = db.query(User).filter_by(id=current_user.id).first()
if request.method == 'POST':
user.privacy_show_phone = request.form.get('show_phone') == 'on'
user.privacy_show_email = request.form.get('show_email') == 'on'
user.contact_prefer_email = request.form.get('prefer_email') == 'on'
user.contact_prefer_phone = request.form.get('prefer_phone') == 'on'
user.contact_prefer_portal = request.form.get('prefer_portal') == 'on'
db.commit()
logger.info(f"Privacy settings updated for user: {user.email}")
flash('Ustawienia prywatnosci zostaly zapisane.', 'success')
return redirect(url_for('auth.konto_prywatnosc'))
return render_template('konto/prywatnosc.html',
user=user,
show_phone=user.privacy_show_phone if user.privacy_show_phone is not None else True,
show_email=user.privacy_show_email if user.privacy_show_email is not None else True)
except Exception as e:
logger.error(f"Privacy settings error: {e}")
flash('Wystapil blad.', 'error')
return redirect(url_for('auth.konto_dane'))
finally:
db.close()
@bp.route('/konto/bezpieczenstwo')
@login_required
def konto_bezpieczenstwo():
"""Security settings - 2FA, password"""
return render_template('konto/bezpieczenstwo.html')
@bp.route('/konto/blokady')
@login_required
def konto_blokady():
"""User blocks management"""
db = SessionLocal()
try:
blocks = db.query(UserBlock).filter_by(user_id=current_user.id).all()
blocked_ids = [b.blocked_user_id for b in blocks]
blocked_ids.append(current_user.id)
available_users = db.query(User).filter(
User.id.notin_(blocked_ids),
User.is_active == True
).order_by(User.name).all()
return render_template('konto/blokady.html',
blocks=blocks,
available_users=available_users)
except Exception as e:
logger.error(f"Blocks page error: {e}")
flash('Wystapil blad.', 'error')
return redirect(url_for('auth.konto_dane'))
finally:
db.close()
@bp.route('/konto/blokady/dodaj', methods=['POST'])
@login_required
def konto_blokady_dodaj():
"""Block a user"""
db = SessionLocal()
try:
user_id = request.form.get('user_id', type=int)
if not user_id or user_id == current_user.id:
flash('Nieprawidlowy uzytkownik.', 'error')
return redirect(url_for('auth.konto_blokady'))
existing = db.query(UserBlock).filter_by(
user_id=current_user.id,
blocked_user_id=user_id
).first()
if existing:
flash('Ten uzytkownik jest juz zablokowany.', 'info')
return redirect(url_for('auth.konto_blokady'))
block = UserBlock(user_id=current_user.id, blocked_user_id=user_id)
db.add(block)
db.commit()
logger.info(f"User {current_user.id} blocked user {user_id}")
flash('Uzytkownik zostal zablokowany.', 'success')
except Exception as e:
logger.error(f"Error blocking user: {e}")
flash('Wystapil blad.', 'error')
finally:
db.close()
return redirect(url_for('auth.konto_blokady'))
@bp.route('/konto/blokady/usun/<int:block_id>', methods=['POST'])
@login_required
def konto_blokady_usun(block_id):
"""Unblock a user"""
db = SessionLocal()
try:
block = db.query(UserBlock).filter(
UserBlock.id == block_id,
UserBlock.user_id == current_user.id
).first()
if not block:
flash('Blokada nie istnieje.', 'error')
return redirect(url_for('auth.konto_blokady'))
blocked_user_id = block.blocked_user_id
db.delete(block)
db.commit()
logger.info(f"User {current_user.id} unblocked user {blocked_user_id}")
flash('Uzytkownik zostal odblokowany.', 'success')
except Exception as e:
logger.error(f"Error unblocking user: {e}")
flash('Wystapil blad.', 'error')
finally:
db.close()
return redirect(url_for('auth.konto_blokady'))
# ============================================================
# PASSWORD RESET
# ============================================================
@bp.route('/forgot-password', methods=['GET', 'POST'])
@limiter.limit("20 per hour")
def forgot_password():
"""Request password reset"""
if current_user.is_authenticated:
return redirect(url_for('index'))
if request.method == 'POST':
email = sanitize_input(request.form.get('email', ''), 255)
if not validate_email(email):
flash('Nieprawidlowy format adresu email.', 'error')
return render_template('auth/forgot_password.html')
db = SessionLocal()
try:
user = db.query(User).filter_by(email=email, is_active=True).first()
if user:
# Generate reset token
reset_token = secrets.token_urlsafe(32)
reset_expires = datetime.now() + timedelta(hours=1)
# Save token to database
user.reset_token = reset_token
user.reset_token_expires = reset_expires
db.commit()
# Build reset URL
base_url = os.getenv('APP_URL', 'https://nordabiznes.pl')
reset_url = f"{base_url}/reset-password/{reset_token}"
# Try to send email
try:
import email_service
if email_service.is_configured():
success = email_service.send_password_reset_email(email, reset_url)
if success:
logger.info(f"Password reset email sent to {email}")
else:
logger.warning(f"Failed to send password reset email to {email}")
logger.info(f"Reset token (email failed) for {email}: {reset_token[:8]}...")
else:
logger.warning("Email service not configured")
logger.info(f"Reset token (no email) for {email}: {reset_token[:8]}...")
except Exception as e:
logger.error(f"Error sending reset email: {e}")
logger.info(f"Reset token (exception) for {email}: {reset_token[:8]}...")
# Always show same message to prevent email enumeration
flash('Jesli email istnieje w systemie, instrukcje resetowania hasla zostaly wyslane.', 'info')
return redirect(url_for('login'))
except Exception as e:
logger.error(f"Password reset error: {e}")
flash('Wystapil blad. Sprobuj ponownie.', 'error')
finally:
db.close()
return render_template('auth/forgot_password.html')
@bp.route('/reset-password/<token>', methods=['GET', 'POST'])
@limiter.limit("30 per hour")
def reset_password(token):
"""Reset password with token"""
if current_user.is_authenticated:
return redirect(url_for('index'))
db = SessionLocal()
try:
# Find user with valid token
user = db.query(User).filter(
User.reset_token == token,
User.reset_token_expires > datetime.now(),
User.is_active == True
).first()
if not user:
flash('Link resetowania hasla jest nieprawidlowy lub wygasl.', 'error')
return redirect(url_for('auth.forgot_password'))
if request.method == 'POST':
password = request.form.get('password', '')
password_confirm = request.form.get('password_confirm', '')
# Validate passwords match
if password != password_confirm:
flash('Hasla nie sa identyczne.', 'error')
return render_template('auth/reset_password.html', token=token)
# Validate password strength
password_valid, password_message = validate_password(password)
if not password_valid:
flash(password_message, 'error')
return render_template('auth/reset_password.html', token=token)
# Update password and clear reset token
user.password_hash = generate_password_hash(password, method='pbkdf2:sha256')
user.reset_token = None
user.reset_token_expires = None
# Auto-verify email - user proved inbox access by using reset link
if not user.is_verified:
user.is_verified = True
user.verified_at = datetime.now()
user.verification_token = None
user.verification_token_expires = None
logger.info(f"Email auto-verified via password reset for {user.email}")
db.commit()
logger.info(f"Password reset successful for {user.email}")
flash('Haslo zostalo zmienione. Mozesz sie teraz zalogowac.', 'success')
return redirect(url_for('login'))
return render_template('auth/reset_password.html', token=token)
except Exception as e:
logger.error(f"Reset password error: {e}")
flash('Wystapil blad. Sprobuj ponownie.', 'error')
return redirect(url_for('auth.forgot_password'))
finally:
db.close()
# ============================================================
# EMAIL VERIFICATION
# ============================================================
@bp.route('/verify-email/<token>')
def verify_email(token):
"""Verify email address with token"""
db = SessionLocal()
try:
user = db.query(User).filter(
User.verification_token == token,
User.verification_token_expires > datetime.now(),
User.is_active == True
).first()
if not user:
flash('Link weryfikacyjny jest nieprawidlowy lub wygasl.', 'error')
return redirect(url_for('login'))
if user.is_verified:
# Already verified - auto-login if not logged in
if not current_user.is_authenticated:
login_user(user, remember=True)
user.last_login = datetime.now()
db.commit()
flash('Witamy ponownie! Zostales automatycznie zalogowany.', 'info')
return redirect(url_for('dashboard'))
flash('Email zostal juz zweryfikowany.', 'info')
return redirect(url_for('dashboard'))
# Verify user
user.is_verified = True
user.verified_at = datetime.now()
user.verification_token = None
user.verification_token_expires = None
# Auto-login the user after verification
login_user(user, remember=True)
user.last_login = datetime.now()
# Log successful verification and login
client_ip = request.headers.get('X-Forwarded-For', request.remote_addr)
if client_ip and ',' in client_ip:
client_ip = client_ip.split(',')[0].strip()
log_audit(db, 'email_verified', 'user', entity_id=user.id, entity_name=user.email,
details={'ip': client_ip})
log_audit(db, 'login', 'user', entity_id=user.id, entity_name=user.email,
details={'method': 'email_verification', 'ip': client_ip})
db.commit()
logger.info(f"Email verified and auto-logged in: {user.email}")
flash('Email zweryfikowany! Witamy w Norda Biznes Partner.', 'success')
return redirect(url_for('dashboard'))
except Exception as e:
logger.error(f"Email verification error: {e}")
flash('Wystapil blad podczas weryfikacji.', 'error')
return redirect(url_for('login'))
finally:
db.close()
@bp.route('/resend-verification', methods=['GET', 'POST'])
@limiter.limit("15 per hour")
def resend_verification():
"""Resend email verification link"""
if current_user.is_authenticated:
return redirect(url_for('index'))
if request.method == 'POST':
email = sanitize_input(request.form.get('email', ''), 255)
if not validate_email(email):
flash('Nieprawidlowy format adresu email.', 'error')
return render_template('auth/resend_verification.html')
db = SessionLocal()
try:
user = db.query(User).filter_by(email=email, is_active=True).first()
if user and not user.is_verified:
# Generate new verification token
verification_token = secrets.token_urlsafe(32)
verification_expires = datetime.now() + timedelta(hours=24)
# Update user token
user.verification_token = verification_token
user.verification_token_expires = verification_expires
db.commit()
# Build verification URL
base_url = os.getenv('APP_URL', 'https://nordabiznes.pl')
verification_url = f"{base_url}/verify-email/{verification_token}"
# Try to send email
try:
import email_service
if email_service.is_configured():
success = email_service.send_welcome_email(email, user.name, verification_url)
if success:
logger.info(f"Verification email resent to {email}")
else:
logger.warning(f"Failed to resend verification email to {email}")
logger.info(f"Resend verification token (email failed) for {email}: {verification_token[:8]}...")
else:
logger.warning("Email service not configured")
logger.info(f"Resend verification token (no email) for {email}: {verification_token[:8]}...")
except Exception as e:
logger.error(f"Error resending verification email: {e}")
logger.info(f"Resend verification token (exception) for {email}: {verification_token[:8]}...")
# Always show same message to prevent email enumeration
flash('Jesli konto istnieje i nie zostalo zweryfikowane, email weryfikacyjny zostal wyslany.', 'info')
return redirect(url_for('login'))
except Exception as e:
logger.error(f"Resend verification error: {e}")
flash('Wystapil blad. Sprobuj ponownie.', 'error')
finally:
db.close()
return render_template('auth/resend_verification.html')