nordabiz/app.py
Maciej Pienczyn cadf91b481 feat(social-audit): Add ability to run Social Media audit
- Add "Uruchom audyt" button to social_audit.html
- Create POST /api/social/audit endpoint to verify profile URLs
- Add loading overlay and modal for audit progress/results
- Audit verifies each social media URL and updates check_status

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-09 05:07:58 +01:00

5051 lines
170 KiB
Python

#!/usr/bin/env python3
"""
Norda Biznes Hub - Flask Application
====================================
Main Flask application for Norda Biznes company directory with AI chat.
Features:
- User authentication with email confirmation
- Company directory with advanced search
- AI chat assistant powered by Google Gemini
- PostgreSQL database integration
- Analytics dashboard for chat insights
Author: Norda Biznes Development Team
Created: 2025-11-23
"""
import os
import logging
import secrets
import re
import json
from collections import deque
from datetime import datetime, timedelta
from flask import Flask, render_template, request, jsonify, redirect, url_for, flash, session, Response
from flask_login import LoginManager, login_user, logout_user, login_required, current_user
from flask_wtf.csrf import CSRFProtect
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from werkzeug.security import generate_password_hash, check_password_hash
from dotenv import load_dotenv
# Load environment variables (override any existing env vars)
# Try .env first, then nordabiz_config.txt for production flexibility
import os
if os.path.exists('.env'):
load_dotenv('.env', override=True)
elif os.path.exists('nordabiz_config.txt'):
load_dotenv('nordabiz_config.txt', override=True)
else:
load_dotenv(override=True)
# Configure logging with in-memory buffer for debug panel
class DebugLogHandler(logging.Handler):
"""Custom handler that stores logs in memory for real-time viewing"""
def __init__(self, max_logs=500):
super().__init__()
self.logs = deque(maxlen=max_logs)
def emit(self, record):
log_entry = {
'timestamp': datetime.now().isoformat(),
'level': record.levelname,
'logger': record.name,
'message': self.format(record),
'module': record.module,
'funcName': record.funcName,
'lineno': record.lineno
}
self.logs.append(log_entry)
# Create debug handler
debug_handler = DebugLogHandler(max_logs=500)
debug_handler.setFormatter(logging.Formatter('%(message)s'))
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Add debug handler to root logger
logging.getLogger().addHandler(debug_handler)
logger = logging.getLogger(__name__)
# Import database models
from database import (
init_db,
SessionLocal,
User,
Company,
Category,
Service,
Competency,
CompanyDigitalMaturity,
CompanyWebsiteAnalysis,
CompanyQualityTracking,
CompanyWebsiteContent,
CompanyAIInsights,
CompanyEvent,
CompanySocialMedia,
CompanyContact,
AIChatConversation,
AIChatMessage,
AIChatFeedback,
AIAPICostLog,
ForumTopic,
ForumReply,
NordaEvent,
EventAttendee,
PrivateMessage,
Classified,
UserNotification,
CompanyRecommendation,
MembershipFee,
MembershipFeeConfig
)
# Import services
import gemini_service
from nordabiz_chat import NordaBizChatEngine
from search_service import search_companies
import krs_api_service
# News service for fetching company news
try:
from news_service import NewsService, get_news_service, init_news_service
NEWS_SERVICE_AVAILABLE = True
except ImportError:
NEWS_SERVICE_AVAILABLE = False
logger.warning("News service not available")
# SEO audit components for triggering audits via API
import sys
_scripts_path = os.path.join(os.path.dirname(__file__), 'scripts')
if _scripts_path not in sys.path:
sys.path.insert(0, _scripts_path)
try:
from seo_audit import SEOAuditor, SEO_AUDIT_VERSION
SEO_AUDIT_AVAILABLE = True
except ImportError as e:
SEO_AUDIT_AVAILABLE = False
logger.warning(f"SEO audit service not available: {e}")
# GBP (Google Business Profile) audit service
try:
from gbp_audit_service import (
GBPAuditService,
audit_company as gbp_audit_company,
get_company_audit as gbp_get_company_audit,
fetch_google_business_data as gbp_fetch_google_data
)
GBP_AUDIT_AVAILABLE = True
GBP_AUDIT_VERSION = '1.0'
except ImportError as e:
GBP_AUDIT_AVAILABLE = False
GBP_AUDIT_VERSION = None
logger.warning(f"GBP audit service not available: {e}")
# Initialize Flask app
app = Flask(__name__)
app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', 'dev-secret-key-change-in-production')
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(days=7)
# Security configurations
app.config['WTF_CSRF_ENABLED'] = True
app.config['WTF_CSRF_TIME_LIMIT'] = None # No time limit for CSRF tokens
app.config['SESSION_COOKIE_SECURE'] = os.getenv('FLASK_ENV') != 'development' # HTTPS only in production
app.config['SESSION_COOKIE_HTTPONLY'] = True
app.config['SESSION_COOKIE_SAMESITE'] = 'Lax'
# Template filters
@app.template_filter('ensure_url')
def ensure_url_filter(url):
"""Ensure URL has http:// or https:// scheme"""
if url and not url.startswith(('http://', 'https://')):
return f'https://{url}'
return url
# Initialize CSRF protection
csrf = CSRFProtect(app)
# Initialize rate limiter
limiter = Limiter(
app=app,
key_func=get_remote_address,
default_limits=["200 per day", "50 per hour"],
storage_uri="memory://"
)
# Initialize database
init_db()
# Initialize Login Manager
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'login'
login_manager.login_message = 'Zaloguj się, aby uzyskać dostęp do tej strony.'
# Initialize Gemini service
try:
gemini_service.init_gemini_service(model='flash-2.0') # Gemini 2.0 Flash (DARMOWY w preview)
logger.info("Gemini service initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize Gemini service: {e}")
@login_manager.user_loader
def load_user(user_id):
"""Load user from database"""
db = SessionLocal()
try:
return db.query(User).filter_by(id=int(user_id)).first()
finally:
db.close()
# ============================================================
# TEMPLATE CONTEXT PROCESSORS
# ============================================================
@app.context_processor
def inject_globals():
"""Inject global variables into all templates"""
return {
'current_year': datetime.now().year
}
@app.context_processor
def inject_notifications():
"""Inject unread notifications count into all templates"""
if current_user.is_authenticated:
db = SessionLocal()
try:
unread_count = db.query(UserNotification).filter(
UserNotification.user_id == current_user.id,
UserNotification.is_read == False
).count()
return {'unread_notifications_count': unread_count}
finally:
db.close()
return {'unread_notifications_count': 0}
# ============================================================
# NOTIFICATION HELPERS
# ============================================================
def create_notification(user_id, title, message, notification_type='info',
related_type=None, related_id=None, action_url=None):
"""
Create a notification for a user.
Args:
user_id: ID of the user to notify
title: Notification title
message: Notification message/body
notification_type: Type of notification (news, system, message, event, alert)
related_type: Type of related entity (company_news, event, message, etc.)
related_id: ID of the related entity
action_url: URL to navigate when notification is clicked
Returns:
UserNotification object or None on error
"""
db = SessionLocal()
try:
notification = UserNotification(
user_id=user_id,
title=title,
message=message,
notification_type=notification_type,
related_type=related_type,
related_id=related_id,
action_url=action_url
)
db.add(notification)
db.commit()
db.refresh(notification)
logger.info(f"Created notification for user {user_id}: {title}")
return notification
except Exception as e:
logger.error(f"Error creating notification: {e}")
db.rollback()
return None
finally:
db.close()
def create_news_notification(company_id, news_id, news_title):
"""
Create notification for company owner when their news is approved.
Args:
company_id: ID of the company
news_id: ID of the approved news
news_title: Title of the news
"""
db = SessionLocal()
try:
# Find users associated with this company
users = db.query(User).filter(
User.company_id == company_id,
User.is_active == True
).all()
for user in users:
create_notification(
user_id=user.id,
title="Nowa aktualnosc o Twojej firmie",
message=f"Aktualnosc '{news_title}' zostala zatwierdzona i jest widoczna na profilu firmy.",
notification_type='news',
related_type='company_news',
related_id=news_id,
action_url=f"/company/{company_id}"
)
finally:
db.close()
# ============================================================
# SECURITY MIDDLEWARE & HELPERS
# ============================================================
@app.after_request
def set_security_headers(response):
"""Add security headers to all responses"""
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['X-Frame-Options'] = 'SAMEORIGIN'
response.headers['X-XSS-Protection'] = '1; mode=block'
response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains'
response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin'
# Content Security Policy
csp = (
"default-src 'self'; "
"script-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net; "
"style-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net https://fonts.googleapis.com; "
"img-src 'self' data: https:; "
"font-src 'self' https://cdn.jsdelivr.net https://fonts.gstatic.com; "
"connect-src 'self'"
)
response.headers['Content-Security-Policy'] = csp
return response
def validate_email(email):
"""Validate email format"""
if not email or len(email) > 255:
return False
# RFC 5322 compliant email regex (simplified)
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return re.match(pattern, email) is not None
def validate_password(password):
"""
Validate password strength
Requirements:
- Minimum 8 characters
- At least one uppercase letter
- At least one lowercase letter
- At least one digit
"""
if not password or len(password) < 8:
return False, "Hasło musi mieć minimum 8 znaków"
if not re.search(r'[A-Z]', password):
return False, "Hasło musi zawierać przynajmniej jedną wielką literę"
if not re.search(r'[a-z]', password):
return False, "Hasło musi zawierać przynajmniej jedną małą literę"
if not re.search(r'\d', password):
return False, "Hasło musi zawierać przynajmniej jedną cyfrę"
return True, "OK"
def sanitize_input(text, max_length=1000):
"""Sanitize user input - remove potentially dangerous characters"""
if not text:
return ""
# Remove null bytes
text = text.replace('\x00', '')
# Trim to max length
text = text[:max_length]
# Strip whitespace
text = text.strip()
return text
def get_free_tier_usage():
"""
Get today's Gemini API usage for free tier tracking.
Returns:
Dict with requests_today and tokens_today
"""
from datetime import date
from sqlalchemy import func
db = SessionLocal()
try:
today = date.today()
result = db.query(
func.count(AIAPICostLog.id).label('requests'),
func.coalesce(func.sum(AIAPICostLog.total_tokens), 0).label('tokens')
).filter(
func.date(AIAPICostLog.timestamp) == today,
AIAPICostLog.api_provider == 'gemini'
).first()
return {
'requests_today': result.requests or 0,
'tokens_today': int(result.tokens or 0)
}
except Exception as e:
logger.warning(f"Failed to get free tier usage: {e}")
return {'requests_today': 0, 'tokens_today': 0}
finally:
db.close()
def get_brave_api_usage():
"""
Get Brave Search API usage for current month.
Brave free tier: 2000 requests/month
Returns:
Dict with usage stats and limits
"""
from datetime import date
from sqlalchemy import func, extract
db = SessionLocal()
try:
today = date.today()
current_month = today.month
current_year = today.year
# Monthly usage
monthly_result = db.query(
func.count(AIAPICostLog.id).label('requests')
).filter(
extract('month', AIAPICostLog.timestamp) == current_month,
extract('year', AIAPICostLog.timestamp) == current_year,
AIAPICostLog.api_provider == 'brave'
).first()
# Today's usage
daily_result = db.query(
func.count(AIAPICostLog.id).label('requests')
).filter(
func.date(AIAPICostLog.timestamp) == today,
AIAPICostLog.api_provider == 'brave'
).first()
monthly_used = monthly_result.requests or 0
daily_used = daily_result.requests or 0
monthly_limit = 2000 # Brave free tier
return {
'requests_today': daily_used,
'requests_this_month': monthly_used,
'monthly_limit': monthly_limit,
'remaining': max(0, monthly_limit - monthly_used),
'usage_percent': round((monthly_used / monthly_limit) * 100, 1) if monthly_limit > 0 else 0,
'tier': 'free',
'is_limit_reached': monthly_used >= monthly_limit
}
except Exception as e:
logger.warning(f"Failed to get Brave API usage: {e}")
return {
'requests_today': 0,
'requests_this_month': 0,
'monthly_limit': 2000,
'remaining': 2000,
'usage_percent': 0,
'tier': 'free',
'is_limit_reached': False
}
finally:
db.close()
def log_brave_api_call(user_id=None, feature='news_search', company_name=None):
"""
Log a Brave API call for usage tracking.
Args:
user_id: User who triggered the call (optional)
feature: Feature name (news_search, etc.)
company_name: Company being searched (for reference)
"""
db = SessionLocal()
try:
log_entry = AIAPICostLog(
api_provider='brave',
model_name='search_api',
feature=feature,
user_id=user_id,
input_tokens=0,
output_tokens=0,
total_tokens=0
)
db.add(log_entry)
db.commit()
logger.debug(f"Logged Brave API call: {feature} for {company_name}")
except Exception as e:
logger.error(f"Failed to log Brave API call: {e}")
db.rollback()
finally:
db.close()
# ============================================================
# HEALTH CHECK
# ============================================================
@app.route('/health')
def health():
"""Health check endpoint for monitoring"""
return {'status': 'ok'}, 200
# ============================================================
# PUBLIC ROUTES
# ============================================================
@app.route('/')
def index():
"""Homepage - landing page for guests, company directory for logged in users"""
if not current_user.is_authenticated:
# Landing page for guests
db = SessionLocal()
try:
total_companies = db.query(Company).filter_by(status='active').count()
total_categories = db.query(Category).count()
return render_template(
'landing.html',
total_companies=total_companies,
total_categories=total_categories
)
finally:
db.close()
# Company directory for logged in users
db = SessionLocal()
try:
companies = db.query(Company).filter_by(status='active').order_by(Company.name).all()
categories = db.query(Category).order_by(Category.sort_order).all()
total_companies = len(companies)
total_categories = len([c for c in categories if db.query(Company).filter_by(category_id=c.id).count() > 0])
return render_template(
'index.html',
companies=companies,
categories=categories,
total_companies=total_companies,
total_categories=total_categories
)
finally:
db.close()
@app.route('/company/<int:company_id>')
# @login_required # Public access
def company_detail(company_id):
"""Company detail page - requires login"""
db = SessionLocal()
try:
company = db.query(Company).filter_by(id=company_id).first()
if not company:
flash('Firma nie znaleziona.', 'error')
return redirect(url_for('index'))
# Load digital maturity data if available
maturity_data = db.query(CompanyDigitalMaturity).filter_by(company_id=company_id).first()
website_analysis = db.query(CompanyWebsiteAnalysis).filter_by(company_id=company_id).first()
# Load quality tracking data
quality_data = db.query(CompanyQualityTracking).filter_by(company_id=company_id).first()
# Load company events (latest 10)
events = db.query(CompanyEvent).filter_by(company_id=company_id).order_by(
CompanyEvent.event_date.desc(),
CompanyEvent.created_at.desc()
).limit(10).all()
# Load website scraping data (most recent)
website_content = db.query(CompanyWebsiteContent).filter_by(company_id=company_id).order_by(
CompanyWebsiteContent.scraped_at.desc()
).first()
# Load AI insights
ai_insights = db.query(CompanyAIInsights).filter_by(company_id=company_id).first()
# Load social media profiles
social_media = db.query(CompanySocialMedia).filter_by(company_id=company_id).all()
# Load company contacts (phones, emails with sources)
contacts = db.query(CompanyContact).filter_by(company_id=company_id).order_by(
CompanyContact.contact_type,
CompanyContact.is_primary.desc()
).all()
# Load recommendations (approved only, with recommender details)
recommendations = db.query(CompanyRecommendation).filter_by(
company_id=company_id,
status='approved'
).join(User, CompanyRecommendation.user_id == User.id).order_by(
CompanyRecommendation.created_at.desc()
).all()
return render_template('company_detail.html',
company=company,
maturity_data=maturity_data,
website_analysis=website_analysis,
quality_data=quality_data,
events=events,
website_content=website_content,
ai_insights=ai_insights,
social_media=social_media,
contacts=contacts,
recommendations=recommendations
)
finally:
db.close()
@app.route('/company/<slug>')
# @login_required # Disabled - public access
def company_detail_by_slug(slug):
"""Company detail page by slug - requires login"""
db = SessionLocal()
try:
company = db.query(Company).filter_by(slug=slug).first()
if not company:
flash('Firma nie znaleziona.', 'error')
return redirect(url_for('index'))
# Redirect to canonical int ID route
return redirect(url_for('company_detail', company_id=company.id))
finally:
db.close()
@app.route('/company/<slug>/recommend', methods=['GET', 'POST'])
# @login_required # Disabled - public access
def company_recommend(slug):
"""Create recommendation for a company - requires login"""
db = SessionLocal()
try:
# Get company
company = db.query(Company).filter_by(slug=slug).first()
if not company:
flash('Firma nie znaleziona.', 'error')
return redirect(url_for('index'))
# Handle POST (form submission)
if request.method == 'POST':
recommendation_text = request.form.get('recommendation_text', '').strip()
service_category = sanitize_input(request.form.get('service_category', ''), 200)
show_contact = request.form.get('show_contact') == '1'
# Validation
if not recommendation_text or len(recommendation_text) < 50:
flash('Rekomendacja musi mieć co najmniej 50 znaków.', 'error')
return render_template('company/recommend.html', company=company)
if len(recommendation_text) > 2000:
flash('Rekomendacja może mieć maksymalnie 2000 znaków.', 'error')
return render_template('company/recommend.html', company=company)
# Prevent self-recommendation
if current_user.company_id == company.id:
flash('Nie możesz polecać własnej firmy.', 'error')
return redirect(url_for('company_detail', company_id=company.id))
# Check for duplicate (user already recommended this company)
existing = db.query(CompanyRecommendation).filter_by(
user_id=current_user.id,
company_id=company.id
).first()
if existing:
flash('Już poleciłeś tę firmę. Możesz edytować swoją wcześniejszą rekomendację.', 'error')
return redirect(url_for('company_detail', company_id=company.id))
# Create recommendation
recommendation = CompanyRecommendation(
company_id=company.id,
user_id=current_user.id,
recommendation_text=recommendation_text,
service_category=service_category if service_category else None,
show_contact=show_contact,
status='pending'
)
db.add(recommendation)
db.commit()
flash('Dziękujemy! Twoja rekomendacja została przesłana i oczekuje na moderację.', 'success')
return redirect(url_for('company_detail', company_id=company.id))
# Handle GET (show form)
return render_template('company/recommend.html', company=company)
finally:
db.close()
@app.route('/search')
@login_required
def search():
"""Search companies with advanced matching - requires login"""
query = request.args.get('q', '')
category_id = request.args.get('category', type=int)
db = SessionLocal()
try:
# Use new SearchService with synonym expansion, NIP/REGON lookup, and fuzzy matching
results = search_companies(db, query, category_id, limit=50)
# Extract companies from SearchResult objects
companies = [r.company for r in results]
# For debugging/analytics - log search stats
if query:
match_types = {}
for r in results:
match_types[r.match_type] = match_types.get(r.match_type, 0) + 1
logger.info(f"Search '{query}': {len(companies)} results, types: {match_types}")
return render_template(
'search_results.html',
companies=companies,
query=query,
category_id=category_id,
result_count=len(companies)
)
finally:
db.close()
@app.route('/aktualnosci')
@login_required
def events():
"""Company events and news - latest updates from member companies"""
from sqlalchemy import func
event_type_filter = request.args.get('type', '')
company_id = request.args.get('company', type=int)
page = request.args.get('page', 1, type=int)
per_page = 20
db = SessionLocal()
try:
# Build query
query = db.query(CompanyEvent).join(Company)
# Apply filters
if event_type_filter:
query = query.filter(CompanyEvent.event_type == event_type_filter)
if company_id:
query = query.filter(CompanyEvent.company_id == company_id)
# Order by date (newest first)
query = query.order_by(
CompanyEvent.event_date.desc(),
CompanyEvent.created_at.desc()
)
# Pagination
total_events = query.count()
events = query.limit(per_page).offset((page - 1) * per_page).all()
# Get companies with events for filter dropdown
companies_with_events = db.query(Company).join(CompanyEvent).distinct().order_by(Company.name).all()
# Event type statistics
event_types = db.query(
CompanyEvent.event_type,
func.count(CompanyEvent.id)
).group_by(CompanyEvent.event_type).all()
return render_template(
'events.html',
events=events,
companies_with_events=companies_with_events,
event_types=event_types,
event_type_filter=event_type_filter,
company_id=company_id,
page=page,
per_page=per_page,
total_events=total_events,
total_pages=(total_events + per_page - 1) // per_page
)
finally:
db.close()
# ============================================================
# FORUM ROUTES
# ============================================================
@app.route('/forum')
@login_required
def forum_index():
"""Forum - list of topics"""
page = request.args.get('page', 1, type=int)
per_page = 20
db = SessionLocal()
try:
# Get topics ordered by pinned first, then by last activity
query = db.query(ForumTopic).order_by(
ForumTopic.is_pinned.desc(),
ForumTopic.updated_at.desc()
)
total_topics = query.count()
topics = query.limit(per_page).offset((page - 1) * per_page).all()
return render_template(
'forum/index.html',
topics=topics,
page=page,
per_page=per_page,
total_topics=total_topics,
total_pages=(total_topics + per_page - 1) // per_page
)
finally:
db.close()
@app.route('/forum/nowy', methods=['GET', 'POST'])
@login_required
def forum_new_topic():
"""Create new forum topic"""
if request.method == 'POST':
title = sanitize_input(request.form.get('title', ''), 255)
content = request.form.get('content', '').strip()
if not title or len(title) < 5:
flash('Tytuł musi mieć co najmniej 5 znaków.', 'error')
return render_template('forum/new_topic.html')
if not content or len(content) < 10:
flash('Treść musi mieć co najmniej 10 znaków.', 'error')
return render_template('forum/new_topic.html')
db = SessionLocal()
try:
topic = ForumTopic(
title=title,
content=content,
author_id=current_user.id
)
db.add(topic)
db.commit()
db.refresh(topic)
flash('Temat został utworzony.', 'success')
return redirect(url_for('forum_topic', topic_id=topic.id))
finally:
db.close()
return render_template('forum/new_topic.html')
@app.route('/forum/<int:topic_id>')
@login_required
def forum_topic(topic_id):
"""View forum topic with replies"""
db = SessionLocal()
try:
topic = db.query(ForumTopic).filter(ForumTopic.id == topic_id).first()
if not topic:
flash('Temat nie istnieje.', 'error')
return redirect(url_for('forum_index'))
# Increment view count
topic.views_count += 1
db.commit()
return render_template('forum/topic.html', topic=topic)
finally:
db.close()
@app.route('/forum/<int:topic_id>/odpowiedz', methods=['POST'])
@login_required
def forum_reply(topic_id):
"""Add reply to forum topic"""
content = request.form.get('content', '').strip()
if not content or len(content) < 3:
flash('Odpowiedź musi mieć co najmniej 3 znaki.', 'error')
return redirect(url_for('forum_topic', topic_id=topic_id))
db = SessionLocal()
try:
topic = db.query(ForumTopic).filter(ForumTopic.id == topic_id).first()
if not topic:
flash('Temat nie istnieje.', 'error')
return redirect(url_for('forum_index'))
if topic.is_locked:
flash('Ten temat jest zamknięty.', 'error')
return redirect(url_for('forum_topic', topic_id=topic_id))
reply = ForumReply(
topic_id=topic_id,
author_id=current_user.id,
content=content
)
db.add(reply)
# Update topic updated_at
topic.updated_at = datetime.now()
db.commit()
flash('Odpowiedź dodana.', 'success')
return redirect(url_for('forum_topic', topic_id=topic_id))
finally:
db.close()
# ============================================================
# FORUM ADMIN ROUTES
# ============================================================
@app.route('/admin/forum')
@login_required
def admin_forum():
"""Admin panel for forum moderation"""
if not current_user.is_admin:
flash('Brak uprawnień do tej strony.', 'error')
return redirect(url_for('forum_index'))
db = SessionLocal()
try:
# Get all topics with stats
topics = db.query(ForumTopic).order_by(
ForumTopic.created_at.desc()
).all()
# Get recent replies
recent_replies = db.query(ForumReply).order_by(
ForumReply.created_at.desc()
).limit(50).all()
# Stats
total_topics = len(topics)
total_replies = db.query(ForumReply).count()
pinned_count = sum(1 for t in topics if t.is_pinned)
locked_count = sum(1 for t in topics if t.is_locked)
return render_template(
'admin/forum.html',
topics=topics,
recent_replies=recent_replies,
total_topics=total_topics,
total_replies=total_replies,
pinned_count=pinned_count,
locked_count=locked_count
)
finally:
db.close()
@app.route('/admin/forum/topic/<int:topic_id>/pin', methods=['POST'])
@login_required
def admin_forum_pin(topic_id):
"""Toggle topic pin status"""
if not current_user.is_admin:
return jsonify({'success': False, 'error': 'Brak uprawnień'}), 403
db = SessionLocal()
try:
topic = db.query(ForumTopic).filter(ForumTopic.id == topic_id).first()
if not topic:
return jsonify({'success': False, 'error': 'Temat nie istnieje'}), 404
topic.is_pinned = not topic.is_pinned
db.commit()
logger.info(f"Admin {current_user.email} {'pinned' if topic.is_pinned else 'unpinned'} topic #{topic_id}")
return jsonify({
'success': True,
'is_pinned': topic.is_pinned,
'message': f"Temat {'przypięty' if topic.is_pinned else 'odpięty'}"
})
finally:
db.close()
@app.route('/admin/forum/topic/<int:topic_id>/lock', methods=['POST'])
@login_required
def admin_forum_lock(topic_id):
"""Toggle topic lock status"""
if not current_user.is_admin:
return jsonify({'success': False, 'error': 'Brak uprawnień'}), 403
db = SessionLocal()
try:
topic = db.query(ForumTopic).filter(ForumTopic.id == topic_id).first()
if not topic:
return jsonify({'success': False, 'error': 'Temat nie istnieje'}), 404
topic.is_locked = not topic.is_locked
db.commit()
logger.info(f"Admin {current_user.email} {'locked' if topic.is_locked else 'unlocked'} topic #{topic_id}")
return jsonify({
'success': True,
'is_locked': topic.is_locked,
'message': f"Temat {'zamknięty' if topic.is_locked else 'otwarty'}"
})
finally:
db.close()
@app.route('/admin/forum/topic/<int:topic_id>/delete', methods=['POST'])
@login_required
def admin_forum_delete_topic(topic_id):
"""Delete topic and all its replies"""
if not current_user.is_admin:
return jsonify({'success': False, 'error': 'Brak uprawnień'}), 403
db = SessionLocal()
try:
topic = db.query(ForumTopic).filter(ForumTopic.id == topic_id).first()
if not topic:
return jsonify({'success': False, 'error': 'Temat nie istnieje'}), 404
topic_title = topic.title
db.delete(topic) # Cascade deletes replies
db.commit()
logger.info(f"Admin {current_user.email} deleted topic #{topic_id}: {topic_title}")
return jsonify({
'success': True,
'message': 'Temat usunięty'
})
finally:
db.close()
@app.route('/admin/forum/reply/<int:reply_id>/delete', methods=['POST'])
@login_required
def admin_forum_delete_reply(reply_id):
"""Delete a reply"""
if not current_user.is_admin:
return jsonify({'success': False, 'error': 'Brak uprawnień'}), 403
db = SessionLocal()
try:
reply = db.query(ForumReply).filter(ForumReply.id == reply_id).first()
if not reply:
return jsonify({'success': False, 'error': 'Odpowiedź nie istnieje'}), 404
topic_id = reply.topic_id
db.delete(reply)
db.commit()
logger.info(f"Admin {current_user.email} deleted reply #{reply_id} from topic #{topic_id}")
return jsonify({
'success': True,
'message': 'Odpowiedź usunięta'
})
finally:
db.close()
# ============================================================
# RECOMMENDATIONS ADMIN ROUTES
# ============================================================
@app.route('/admin/recommendations')
@login_required
def admin_recommendations():
"""Admin panel for recommendations moderation"""
if not current_user.is_admin:
flash('Brak uprawnień do tej strony.', 'error')
return redirect(url_for('index'))
db = SessionLocal()
try:
# Get all recommendations with user and company info
recommendations = db.query(CompanyRecommendation).order_by(
CompanyRecommendation.created_at.desc()
).all()
# Get pending recommendations (requires moderation)
pending_recommendations = db.query(CompanyRecommendation).filter(
CompanyRecommendation.status == 'pending'
).order_by(CompanyRecommendation.created_at.desc()).all()
# Stats
total_recommendations = len(recommendations)
pending_count = len(pending_recommendations)
approved_count = db.query(CompanyRecommendation).filter(
CompanyRecommendation.status == 'approved'
).count()
rejected_count = db.query(CompanyRecommendation).filter(
CompanyRecommendation.status == 'rejected'
).count()
logger.info(f"Admin {current_user.email} accessed recommendations panel - {pending_count} pending")
return render_template(
'admin/recommendations.html',
recommendations=recommendations,
pending_recommendations=pending_recommendations,
total_recommendations=total_recommendations,
pending_count=pending_count,
approved_count=approved_count,
rejected_count=rejected_count
)
finally:
db.close()
@app.route('/admin/recommendations/<int:recommendation_id>/approve', methods=['POST'])
@login_required
def admin_recommendation_approve(recommendation_id):
"""Approve a recommendation"""
if not current_user.is_admin:
return jsonify({'success': False, 'error': 'Brak uprawnień'}), 403
db = SessionLocal()
try:
recommendation = db.query(CompanyRecommendation).filter(
CompanyRecommendation.id == recommendation_id
).first()
if not recommendation:
return jsonify({'success': False, 'error': 'Rekomendacja nie istnieje'}), 404
recommendation.status = 'approved'
recommendation.moderated_by = current_user.id
recommendation.moderated_at = datetime.utcnow()
recommendation.rejection_reason = None # Clear any previous rejection reason
db.commit()
logger.info(f"Admin {current_user.email} approved recommendation #{recommendation_id}")
return jsonify({
'success': True,
'message': 'Rekomendacja zatwierdzona'
})
finally:
db.close()
@app.route('/admin/recommendations/<int:recommendation_id>/reject', methods=['POST'])
@login_required
def admin_recommendation_reject(recommendation_id):
"""Reject a recommendation"""
if not current_user.is_admin:
return jsonify({'success': False, 'error': 'Brak uprawnień'}), 403
db = SessionLocal()
try:
recommendation = db.query(CompanyRecommendation).filter(
CompanyRecommendation.id == recommendation_id
).first()
if not recommendation:
return jsonify({'success': False, 'error': 'Rekomendacja nie istnieje'}), 404
# Get optional rejection reason from request
rejection_reason = request.json.get('reason', '') if request.is_json else request.form.get('reason', '')
recommendation.status = 'rejected'
recommendation.moderated_by = current_user.id
recommendation.moderated_at = datetime.utcnow()
recommendation.rejection_reason = rejection_reason.strip() if rejection_reason else None
db.commit()
logger.info(f"Admin {current_user.email} rejected recommendation #{recommendation_id}")
return jsonify({
'success': True,
'message': 'Rekomendacja odrzucona'
})
finally:
db.close()
# ============================================================
# MEMBERSHIP FEES ADMIN
# ============================================================
MONTHS_PL = [
(1, 'Styczen'), (2, 'Luty'), (3, 'Marzec'), (4, 'Kwiecien'),
(5, 'Maj'), (6, 'Czerwiec'), (7, 'Lipiec'), (8, 'Sierpien'),
(9, 'Wrzesien'), (10, 'Pazdziernik'), (11, 'Listopad'), (12, 'Grudzien')
]
@app.route('/admin/fees')
@login_required
def admin_fees():
"""Admin panel for membership fee management"""
if not current_user.is_admin:
flash('Brak uprawnien do tej strony.', 'error')
return redirect(url_for('index'))
db = SessionLocal()
try:
from sqlalchemy import func, case
from decimal import Decimal
# Get filter parameters
year = request.args.get('year', datetime.now().year, type=int)
month = request.args.get('month', type=int)
status_filter = request.args.get('status', '')
# Get all active companies
companies = db.query(Company).filter(Company.status == 'active').order_by(Company.name).all()
# Get fees for selected period
fee_query = db.query(MembershipFee).filter(MembershipFee.fee_year == year)
if month:
fee_query = fee_query.filter(MembershipFee.fee_month == month)
fees = {(f.company_id, f.fee_month): f for f in fee_query.all()}
# Build company list with fee status
companies_fees = []
for company in companies:
if month:
fee = fees.get((company.id, month))
companies_fees.append({
'company': company,
'fee': fee,
'status': fee.status if fee else 'brak'
})
else:
# Show all months
company_data = {'company': company, 'months': {}}
for m in range(1, 13):
fee = fees.get((company.id, m))
company_data['months'][m] = fee
companies_fees.append(company_data)
# Apply status filter
if status_filter and month:
if status_filter == 'paid':
companies_fees = [cf for cf in companies_fees if cf.get('status') == 'paid']
elif status_filter == 'pending':
companies_fees = [cf for cf in companies_fees if cf.get('status') in ('pending', 'brak')]
elif status_filter == 'overdue':
companies_fees = [cf for cf in companies_fees if cf.get('status') == 'overdue']
# Calculate stats
total_companies = len(companies)
if month:
month_fees = [cf.get('fee') for cf in companies_fees if cf.get('fee')]
paid_count = sum(1 for f in month_fees if f and f.status == 'paid')
pending_count = total_companies - paid_count
total_due = sum(float(f.amount) for f in month_fees if f) if month_fees else Decimal(0)
total_paid = sum(float(f.amount_paid or 0) for f in month_fees if f) if month_fees else Decimal(0)
else:
all_fees = list(fees.values())
paid_count = sum(1 for f in all_fees if f.status == 'paid')
pending_count = len(all_fees) - paid_count
total_due = sum(float(f.amount) for f in all_fees) if all_fees else Decimal(0)
total_paid = sum(float(f.amount_paid or 0) for f in all_fees) if all_fees else Decimal(0)
# Get default fee amount
fee_config = db.query(MembershipFeeConfig).filter(
MembershipFeeConfig.scope == 'global',
MembershipFeeConfig.valid_until == None
).first()
default_fee = float(fee_config.monthly_amount) if fee_config else 100.00
return render_template(
'admin/fees.html',
companies_fees=companies_fees,
year=year,
month=month,
status_filter=status_filter,
total_companies=total_companies,
paid_count=paid_count,
pending_count=pending_count,
total_due=total_due,
total_paid=total_paid,
default_fee=default_fee,
years=list(range(2024, datetime.now().year + 2)),
months=MONTHS_PL
)
finally:
db.close()
@app.route('/admin/fees/generate', methods=['POST'])
@login_required
def admin_fees_generate():
"""Generate fee records for all companies for a given month"""
if not current_user.is_admin:
return jsonify({'success': False, 'error': 'Brak uprawnien'}), 403
db = SessionLocal()
try:
year = request.form.get('year', type=int)
month = request.form.get('month', type=int)
if not year or not month:
return jsonify({'success': False, 'error': 'Brak roku lub miesiaca'}), 400
# Get default fee amount
fee_config = db.query(MembershipFeeConfig).filter(
MembershipFeeConfig.scope == 'global',
MembershipFeeConfig.valid_until == None
).first()
default_fee = fee_config.monthly_amount if fee_config else 100.00
# Get all active companies
companies = db.query(Company).filter(Company.status == 'active').all()
created = 0
for company in companies:
# Check if record already exists
existing = db.query(MembershipFee).filter(
MembershipFee.company_id == company.id,
MembershipFee.fee_year == year,
MembershipFee.fee_month == month
).first()
if not existing:
fee = MembershipFee(
company_id=company.id,
fee_year=year,
fee_month=month,
amount=default_fee,
status='pending'
)
db.add(fee)
created += 1
db.commit()
return jsonify({
'success': True,
'message': f'Utworzono {created} rekordow skladek'
})
except Exception as e:
db.rollback()
logger.error(f"Error generating fees: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
finally:
db.close()
@app.route('/admin/fees/<int:fee_id>/mark-paid', methods=['POST'])
@login_required
def admin_fees_mark_paid(fee_id):
"""Mark a fee as paid"""
if not current_user.is_admin:
return jsonify({'success': False, 'error': 'Brak uprawnien'}), 403
db = SessionLocal()
try:
fee = db.query(MembershipFee).filter(MembershipFee.id == fee_id).first()
if not fee:
return jsonify({'success': False, 'error': 'Nie znaleziono skladki'}), 404
# Get data from request
amount_paid = request.form.get('amount_paid', type=float)
payment_date = request.form.get('payment_date')
payment_method = request.form.get('payment_method', 'transfer')
payment_reference = request.form.get('payment_reference', '')
notes = request.form.get('notes', '')
# Update fee record
fee.amount_paid = amount_paid or float(fee.amount)
fee.payment_date = datetime.strptime(payment_date, '%Y-%m-%d').date() if payment_date else datetime.now().date()
fee.payment_method = payment_method
fee.payment_reference = payment_reference
fee.notes = notes
fee.recorded_by = current_user.id
fee.recorded_at = datetime.now()
# Set status based on payment amount
if fee.amount_paid >= float(fee.amount):
fee.status = 'paid'
elif fee.amount_paid > 0:
fee.status = 'partial'
db.commit()
return jsonify({
'success': True,
'message': 'Skladka zostala zarejestrowana'
})
except Exception as e:
db.rollback()
logger.error(f"Error marking fee as paid: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
finally:
db.close()
@app.route('/admin/fees/bulk-mark-paid', methods=['POST'])
@login_required
def admin_fees_bulk_mark_paid():
"""Bulk mark fees as paid"""
if not current_user.is_admin:
return jsonify({'success': False, 'error': 'Brak uprawnien'}), 403
db = SessionLocal()
try:
fee_ids = request.form.getlist('fee_ids[]', type=int)
if not fee_ids:
return jsonify({'success': False, 'error': 'Brak wybranych skladek'}), 400
updated = 0
for fee_id in fee_ids:
fee = db.query(MembershipFee).filter(MembershipFee.id == fee_id).first()
if fee and fee.status != 'paid':
fee.status = 'paid'
fee.amount_paid = fee.amount
fee.payment_date = datetime.now().date()
fee.recorded_by = current_user.id
fee.recorded_at = datetime.now()
updated += 1
db.commit()
return jsonify({
'success': True,
'message': f'Zaktualizowano {updated} rekordow'
})
except Exception as e:
db.rollback()
logger.error(f"Error in bulk action: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
finally:
db.close()
@app.route('/admin/fees/export')
@login_required
def admin_fees_export():
"""Export fees to CSV"""
if not current_user.is_admin:
flash('Brak uprawnien.', 'error')
return redirect(url_for('admin_fees'))
import csv
from io import StringIO
db = SessionLocal()
try:
year = request.args.get('year', datetime.now().year, type=int)
month = request.args.get('month', type=int)
query = db.query(MembershipFee).join(Company).filter(
MembershipFee.fee_year == year
)
if month:
query = query.filter(MembershipFee.fee_month == month)
fees = query.order_by(Company.name, MembershipFee.fee_month).all()
# Generate CSV
output = StringIO()
writer = csv.writer(output)
writer.writerow([
'Firma', 'NIP', 'Rok', 'Miesiac', 'Kwota', 'Zaplacono',
'Status', 'Data platnosci', 'Metoda', 'Referencja', 'Notatki'
])
for fee in fees:
writer.writerow([
fee.company.name,
fee.company.nip,
fee.fee_year,
fee.fee_month,
fee.amount,
fee.amount_paid,
fee.status,
fee.payment_date,
fee.payment_method,
fee.payment_reference,
fee.notes
])
output.seek(0)
return Response(
output.getvalue(),
mimetype='text/csv',
headers={
'Content-Disposition': f'attachment; filename=skladki_{year}_{month or "all"}.csv'
}
)
finally:
db.close()
# ============================================================
# CALENDAR ROUTES
# ============================================================
@app.route('/kalendarz')
@login_required
def calendar_index():
"""Kalendarz wydarzeń Norda Biznes"""
from datetime import date
db = SessionLocal()
try:
today = date.today()
# Nadchodzące wydarzenia
upcoming = db.query(NordaEvent).filter(
NordaEvent.event_date >= today
).order_by(NordaEvent.event_date.asc()).all()
# Przeszłe wydarzenia (ostatnie 5)
past = db.query(NordaEvent).filter(
NordaEvent.event_date < today
).order_by(NordaEvent.event_date.desc()).limit(5).all()
return render_template('calendar/index.html',
upcoming_events=upcoming,
past_events=past,
today=today
)
finally:
db.close()
@app.route('/kalendarz/<int:event_id>')
@login_required
def calendar_event(event_id):
"""Szczegóły wydarzenia"""
db = SessionLocal()
try:
event = db.query(NordaEvent).filter(NordaEvent.id == event_id).first()
if not event:
flash('Wydarzenie nie istnieje.', 'error')
return redirect(url_for('calendar_index'))
# Sprawdź czy użytkownik jest zapisany
user_attending = db.query(EventAttendee).filter(
EventAttendee.event_id == event_id,
EventAttendee.user_id == current_user.id
).first()
return render_template('calendar/event.html',
event=event,
user_attending=user_attending
)
finally:
db.close()
@app.route('/kalendarz/<int:event_id>/rsvp', methods=['POST'])
@login_required
def calendar_rsvp(event_id):
"""Zapisz się / wypisz z wydarzenia"""
db = SessionLocal()
try:
event = db.query(NordaEvent).filter(NordaEvent.id == event_id).first()
if not event:
return jsonify({'success': False, 'error': 'Wydarzenie nie istnieje'}), 404
# Sprawdź czy już zapisany
existing = db.query(EventAttendee).filter(
EventAttendee.event_id == event_id,
EventAttendee.user_id == current_user.id
).first()
if existing:
# Wypisz
db.delete(existing)
db.commit()
return jsonify({
'success': True,
'action': 'removed',
'message': 'Wypisano z wydarzenia',
'attendee_count': event.attendee_count
})
else:
# Zapisz
if event.max_attendees and event.attendee_count >= event.max_attendees:
return jsonify({'success': False, 'error': 'Brak wolnych miejsc'}), 400
attendee = EventAttendee(
event_id=event_id,
user_id=current_user.id,
status='confirmed'
)
db.add(attendee)
db.commit()
return jsonify({
'success': True,
'action': 'added',
'message': 'Zapisano na wydarzenie',
'attendee_count': event.attendee_count
})
finally:
db.close()
@app.route('/admin/kalendarz')
@login_required
def admin_calendar():
"""Panel admin - zarządzanie wydarzeniami"""
if not current_user.is_admin:
flash('Brak uprawnień.', 'error')
return redirect(url_for('calendar_index'))
db = SessionLocal()
try:
events = db.query(NordaEvent).order_by(NordaEvent.event_date.desc()).all()
return render_template('calendar/admin.html', events=events)
finally:
db.close()
@app.route('/admin/kalendarz/nowy', methods=['GET', 'POST'])
@login_required
def admin_calendar_new():
"""Dodaj nowe wydarzenie"""
if not current_user.is_admin:
flash('Brak uprawnień.', 'error')
return redirect(url_for('calendar_index'))
if request.method == 'POST':
from datetime import datetime as dt
title = sanitize_input(request.form.get('title', ''), 255)
description = request.form.get('description', '').strip()
event_type = request.form.get('event_type', 'meeting')
event_date_str = request.form.get('event_date', '')
time_start_str = request.form.get('time_start', '')
time_end_str = request.form.get('time_end', '')
location = sanitize_input(request.form.get('location', ''), 500)
location_url = request.form.get('location_url', '').strip()
speaker_name = sanitize_input(request.form.get('speaker_name', ''), 255)
max_attendees = request.form.get('max_attendees', type=int)
if not title or not event_date_str:
flash('Tytuł i data są wymagane.', 'error')
return render_template('calendar/admin_new.html')
db = SessionLocal()
try:
event = NordaEvent(
title=title,
description=description,
event_type=event_type,
event_date=dt.strptime(event_date_str, '%Y-%m-%d').date(),
time_start=dt.strptime(time_start_str, '%H:%M').time() if time_start_str else None,
time_end=dt.strptime(time_end_str, '%H:%M').time() if time_end_str else None,
location=location,
location_url=location_url,
speaker_name=speaker_name,
max_attendees=max_attendees,
created_by=current_user.id
)
db.add(event)
db.commit()
flash('Wydarzenie utworzone.', 'success')
return redirect(url_for('admin_calendar'))
finally:
db.close()
return render_template('calendar/admin_new.html')
@app.route('/admin/kalendarz/<int:event_id>/delete', methods=['POST'])
@login_required
def admin_calendar_delete(event_id):
"""Usuń wydarzenie"""
if not current_user.is_admin:
return jsonify({'success': False, 'error': 'Brak uprawnień'}), 403
db = SessionLocal()
try:
event = db.query(NordaEvent).filter(NordaEvent.id == event_id).first()
if not event:
return jsonify({'success': False, 'error': 'Wydarzenie nie istnieje'}), 404
db.delete(event)
db.commit()
return jsonify({'success': True, 'message': 'Wydarzenie usunięte'})
finally:
db.close()
# ============================================================
# PRIVATE MESSAGES ROUTES
# ============================================================
@app.route('/wiadomosci')
@login_required
def messages_inbox():
"""Skrzynka odbiorcza"""
page = request.args.get('page', 1, type=int)
per_page = 20
db = SessionLocal()
try:
query = db.query(PrivateMessage).filter(
PrivateMessage.recipient_id == current_user.id
).order_by(PrivateMessage.created_at.desc())
total = query.count()
messages = query.limit(per_page).offset((page - 1) * per_page).all()
unread_count = db.query(PrivateMessage).filter(
PrivateMessage.recipient_id == current_user.id,
PrivateMessage.is_read == False
).count()
return render_template('messages/inbox.html',
messages=messages,
page=page,
total_pages=(total + per_page - 1) // per_page,
unread_count=unread_count
)
finally:
db.close()
@app.route('/wiadomosci/wyslane')
@login_required
def messages_sent():
"""Wysłane wiadomości"""
page = request.args.get('page', 1, type=int)
per_page = 20
db = SessionLocal()
try:
query = db.query(PrivateMessage).filter(
PrivateMessage.sender_id == current_user.id
).order_by(PrivateMessage.created_at.desc())
total = query.count()
messages = query.limit(per_page).offset((page - 1) * per_page).all()
return render_template('messages/sent.html',
messages=messages,
page=page,
total_pages=(total + per_page - 1) // per_page
)
finally:
db.close()
@app.route('/wiadomosci/nowa')
@login_required
def messages_new():
"""Formularz nowej wiadomości"""
recipient_id = request.args.get('to', type=int)
db = SessionLocal()
try:
# Lista użytkowników do wyboru
users = db.query(User).filter(
User.is_active == True,
User.is_verified == True,
User.id != current_user.id
).order_by(User.name).all()
recipient = None
if recipient_id:
recipient = db.query(User).filter(User.id == recipient_id).first()
return render_template('messages/compose.html',
users=users,
recipient=recipient
)
finally:
db.close()
@app.route('/wiadomosci/wyslij', methods=['POST'])
@login_required
def messages_send():
"""Wyślij wiadomość"""
recipient_id = request.form.get('recipient_id', type=int)
subject = sanitize_input(request.form.get('subject', ''), 255)
content = request.form.get('content', '').strip()
if not recipient_id or not content:
flash('Odbiorca i treść są wymagane.', 'error')
return redirect(url_for('messages_new'))
db = SessionLocal()
try:
recipient = db.query(User).filter(User.id == recipient_id).first()
if not recipient:
flash('Odbiorca nie istnieje.', 'error')
return redirect(url_for('messages_new'))
message = PrivateMessage(
sender_id=current_user.id,
recipient_id=recipient_id,
subject=subject,
content=content
)
db.add(message)
db.commit()
flash('Wiadomość wysłana.', 'success')
return redirect(url_for('messages_sent'))
finally:
db.close()
@app.route('/wiadomosci/<int:message_id>')
@login_required
def messages_view(message_id):
"""Czytaj wiadomość"""
db = SessionLocal()
try:
message = db.query(PrivateMessage).filter(
PrivateMessage.id == message_id
).first()
if not message:
flash('Wiadomość nie istnieje.', 'error')
return redirect(url_for('messages_inbox'))
# Sprawdź dostęp
if message.recipient_id != current_user.id and message.sender_id != current_user.id:
flash('Brak dostępu do tej wiadomości.', 'error')
return redirect(url_for('messages_inbox'))
# Oznacz jako przeczytaną
if message.recipient_id == current_user.id and not message.is_read:
message.is_read = True
message.read_at = datetime.now()
db.commit()
return render_template('messages/view.html', message=message)
finally:
db.close()
@app.route('/wiadomosci/<int:message_id>/odpowiedz', methods=['POST'])
@login_required
def messages_reply(message_id):
"""Odpowiedz na wiadomość"""
content = request.form.get('content', '').strip()
if not content:
flash('Treść jest wymagana.', 'error')
return redirect(url_for('messages_view', message_id=message_id))
db = SessionLocal()
try:
original = db.query(PrivateMessage).filter(
PrivateMessage.id == message_id
).first()
if not original:
flash('Wiadomość nie istnieje.', 'error')
return redirect(url_for('messages_inbox'))
# Odpowiedz do nadawcy oryginalnej wiadomości
recipient_id = original.sender_id if original.sender_id != current_user.id else original.recipient_id
reply = PrivateMessage(
sender_id=current_user.id,
recipient_id=recipient_id,
subject=f"Re: {original.subject}" if original.subject else None,
content=content,
parent_id=message_id
)
db.add(reply)
db.commit()
flash('Odpowiedź wysłana.', 'success')
return redirect(url_for('messages_view', message_id=message_id))
finally:
db.close()
@app.route('/api/messages/unread-count')
@login_required
def api_unread_count():
"""API: Liczba nieprzeczytanych wiadomości"""
db = SessionLocal()
try:
count = db.query(PrivateMessage).filter(
PrivateMessage.recipient_id == current_user.id,
PrivateMessage.is_read == False
).count()
return jsonify({'count': count})
finally:
db.close()
# ============================================================
# NOTIFICATIONS API ROUTES
# ============================================================
@app.route('/api/notifications')
@login_required
def api_notifications():
"""API: Get user notifications"""
limit = request.args.get('limit', 20, type=int)
offset = request.args.get('offset', 0, type=int)
unread_only = request.args.get('unread_only', 'false').lower() == 'true'
db = SessionLocal()
try:
query = db.query(UserNotification).filter(
UserNotification.user_id == current_user.id
)
if unread_only:
query = query.filter(UserNotification.is_read == False)
# Order by most recent first
query = query.order_by(UserNotification.created_at.desc())
total = query.count()
notifications = query.limit(limit).offset(offset).all()
return jsonify({
'success': True,
'notifications': [
{
'id': n.id,
'title': n.title,
'message': n.message,
'notification_type': n.notification_type,
'related_type': n.related_type,
'related_id': n.related_id,
'action_url': n.action_url,
'is_read': n.is_read,
'created_at': n.created_at.isoformat() if n.created_at else None
}
for n in notifications
],
'total': total,
'unread_count': db.query(UserNotification).filter(
UserNotification.user_id == current_user.id,
UserNotification.is_read == False
).count()
})
finally:
db.close()
@app.route('/api/notifications/<int:notification_id>/read', methods=['POST'])
@login_required
def api_notification_mark_read(notification_id):
"""API: Mark notification as read"""
db = SessionLocal()
try:
notification = db.query(UserNotification).filter(
UserNotification.id == notification_id,
UserNotification.user_id == current_user.id
).first()
if not notification:
return jsonify({'success': False, 'error': 'Powiadomienie nie znalezione'}), 404
notification.mark_as_read()
db.commit()
return jsonify({
'success': True,
'message': 'Oznaczono jako przeczytane'
})
finally:
db.close()
@app.route('/api/notifications/read-all', methods=['POST'])
@login_required
def api_notifications_mark_all_read():
"""API: Mark all notifications as read"""
db = SessionLocal()
try:
updated = db.query(UserNotification).filter(
UserNotification.user_id == current_user.id,
UserNotification.is_read == False
).update({
UserNotification.is_read: True,
UserNotification.read_at: datetime.now()
})
db.commit()
return jsonify({
'success': True,
'message': f'Oznaczono {updated} powiadomien jako przeczytane',
'count': updated
})
finally:
db.close()
@app.route('/api/notifications/unread-count')
@login_required
def api_notifications_unread_count():
"""API: Get unread notifications count"""
db = SessionLocal()
try:
count = db.query(UserNotification).filter(
UserNotification.user_id == current_user.id,
UserNotification.is_read == False
).count()
return jsonify({'count': count})
finally:
db.close()
# ============================================================
# RECOMMENDATIONS API ROUTES
# ============================================================
@app.route('/api/recommendations/<int:company_id>', methods=['GET'])
@login_required
def api_get_recommendations(company_id):
"""API: Get all approved recommendations for a company"""
db = SessionLocal()
try:
# Verify company exists
company = db.query(Company).filter_by(id=company_id).first()
if not company:
return jsonify({
'success': False,
'error': 'Firma nie znaleziona'
}), 404
# Query recommendations with user details
recommendations = db.query(CompanyRecommendation).filter_by(
company_id=company_id,
status='approved'
).join(User, CompanyRecommendation.user_id == User.id).order_by(CompanyRecommendation.created_at.desc()).all()
# Build response with recommender details
result = []
for rec in recommendations:
recommender = db.query(User).filter_by(id=rec.user_id).first()
recommender_company = None
if recommender and recommender.company_id:
recommender_company = db.query(Company).filter_by(id=recommender.company_id).first()
rec_data = {
'id': rec.id,
'recommendation_text': rec.recommendation_text,
'service_category': rec.service_category,
'created_at': rec.created_at.isoformat() if rec.created_at else None,
'updated_at': rec.updated_at.isoformat() if rec.updated_at else None,
'recommender': {
'name': recommender.full_name if recommender else '[Użytkownik usunięty]',
'email': recommender.email if (recommender and rec.show_contact) else None,
'phone': recommender.phone if (recommender and rec.show_contact) else None,
'company_id': recommender_company.id if recommender_company else None,
'company_name': recommender_company.name if recommender_company else None,
'company_slug': recommender_company.slug if recommender_company else None
}
}
result.append(rec_data)
return jsonify({
'success': True,
'company_id': company_id,
'company_name': company.name,
'recommendations': result,
'count': len(result)
})
except Exception as e:
logger.error(f"Error fetching recommendations for company {company_id}: {e}")
return jsonify({
'success': False,
'error': 'Wystąpił błąd podczas pobierania rekomendacji'
}), 500
finally:
db.close()
@app.route('/api/recommendations/create', methods=['POST'])
@login_required
def api_create_recommendation():
"""API: Create a new recommendation"""
db = SessionLocal()
try:
# Get JSON data
data = request.get_json()
if not data:
return jsonify({
'success': False,
'error': 'Brak danych'
}), 400
company_id = data.get('company_id')
recommendation_text = data.get('recommendation_text', '').strip()
service_category = data.get('service_category', '').strip() or None
show_contact = data.get('show_contact', True)
# Validate required fields
if not company_id:
return jsonify({
'success': False,
'error': 'Brak ID firmy'
}), 400
if not recommendation_text:
return jsonify({
'success': False,
'error': 'Treść rekomendacji jest wymagana'
}), 400
# Validate text length (50-2000 characters)
if len(recommendation_text) < 50:
return jsonify({
'success': False,
'error': 'Rekomendacja musi mieć co najmniej 50 znaków'
}), 400
if len(recommendation_text) > 2000:
return jsonify({
'success': False,
'error': 'Rekomendacja nie może przekraczać 2000 znaków'
}), 400
# Check if user is verified
if not current_user.is_verified:
return jsonify({
'success': False,
'error': 'Tylko zweryfikowani użytkownicy mogą dodawać rekomendacje'
}), 403
# Verify company exists
company = db.query(Company).filter_by(id=company_id, status='active').first()
if not company:
return jsonify({
'success': False,
'error': 'Firma nie znaleziona'
}), 404
# Prevent self-recommendation
if current_user.company_id and current_user.company_id == company_id:
return jsonify({
'success': False,
'error': 'Nie możesz polecać własnej firmy'
}), 400
# Check for duplicate recommendation (user can only have one recommendation per company)
existing_rec = db.query(CompanyRecommendation).filter_by(
user_id=current_user.id,
company_id=company_id
).first()
if existing_rec:
return jsonify({
'success': False,
'error': 'Już poleciłeś tę firmę. Możesz edytować swoją istniejącą rekomendację.'
}), 400
# Create recommendation
recommendation = CompanyRecommendation(
company_id=company_id,
user_id=current_user.id,
recommendation_text=recommendation_text,
service_category=service_category,
show_contact=show_contact,
status='pending' # Start as pending for moderation
)
db.add(recommendation)
db.commit()
db.refresh(recommendation)
# Create notification for company owner (if exists)
# Find users associated with this company
company_users = db.query(User).filter_by(company_id=company_id, is_active=True).all()
for company_user in company_users:
if company_user.id != current_user.id:
notification = UserNotification(
user_id=company_user.id,
notification_type='new_recommendation',
title='Nowa rekomendacja',
message=f'{current_user.name or current_user.email} polecił Twoją firmę: {company.name}',
action_url=f'/company/{company.slug}#recommendations',
related_id=recommendation.id
)
db.add(notification)
db.commit()
logger.info(f"Recommendation created: user {current_user.id} -> company {company_id}, ID {recommendation.id}")
return jsonify({
'success': True,
'message': 'Rekomendacja została utworzona i oczekuje na moderację',
'recommendation_id': recommendation.id,
'status': recommendation.status
}), 201
except Exception as e:
logger.error(f"Error creating recommendation: {e}")
db.rollback()
return jsonify({
'success': False,
'error': 'Wystąpił błąd podczas tworzenia rekomendacji'
}), 500
finally:
db.close()
@app.route('/api/recommendations/<int:rec_id>/edit', methods=['POST'])
@login_required
def api_edit_recommendation(rec_id):
"""API: Edit an existing recommendation (owner or admin only)"""
db = SessionLocal()
try:
# Get the recommendation
recommendation = db.query(CompanyRecommendation).filter_by(id=rec_id).first()
if not recommendation:
return jsonify({
'success': False,
'error': 'Rekomendacja nie znaleziona'
}), 404
# Check authorization - user must be the owner OR admin
if recommendation.user_id != current_user.id and not current_user.is_admin:
return jsonify({
'success': False,
'error': 'Brak uprawnień do edycji tej rekomendacji'
}), 403
# Get JSON data
data = request.get_json()
if not data:
return jsonify({
'success': False,
'error': 'Brak danych'
}), 400
recommendation_text = data.get('recommendation_text', '').strip()
service_category = data.get('service_category', '').strip() or None
show_contact = data.get('show_contact', recommendation.show_contact)
# Validate text if provided
if recommendation_text:
# Validate text length (50-2000 characters)
if len(recommendation_text) < 50:
return jsonify({
'success': False,
'error': 'Rekomendacja musi mieć co najmniej 50 znaków'
}), 400
if len(recommendation_text) > 2000:
return jsonify({
'success': False,
'error': 'Rekomendacja nie może przekraczać 2000 znaków'
}), 400
recommendation.recommendation_text = recommendation_text
# Update other fields if provided
if 'service_category' in data:
recommendation.service_category = service_category
if 'show_contact' in data:
recommendation.show_contact = show_contact
# Update timestamp
recommendation.updated_at = datetime.now()
db.commit()
logger.info(f"Recommendation edited: ID {rec_id} by user {current_user.id}")
return jsonify({
'success': True,
'message': 'Rekomendacja została zaktualizowana',
'recommendation_id': recommendation.id
})
except Exception as e:
logger.error(f"Error editing recommendation {rec_id}: {e}")
db.rollback()
return jsonify({
'success': False,
'error': 'Wystąpił błąd podczas edycji rekomendacji'
}), 500
finally:
db.close()
@app.route('/api/recommendations/<int:rec_id>/delete', methods=['POST'])
@login_required
def api_delete_recommendation(rec_id):
"""API: Delete a recommendation (owner or admin only)"""
db = SessionLocal()
try:
# Get the recommendation
recommendation = db.query(CompanyRecommendation).filter_by(id=rec_id).first()
if not recommendation:
return jsonify({
'success': False,
'error': 'Rekomendacja nie znaleziona'
}), 404
# Check authorization - user must be the owner OR admin
if recommendation.user_id != current_user.id and not current_user.is_admin:
return jsonify({
'success': False,
'error': 'Brak uprawnień do usunięcia tej rekomendacji'
}), 403
# Store info for logging
company_id = recommendation.company_id
user_id = recommendation.user_id
# Delete the recommendation
db.delete(recommendation)
db.commit()
logger.info(f"Recommendation deleted: ID {rec_id} (company {company_id}, user {user_id}) by user {current_user.id}")
return jsonify({
'success': True,
'message': 'Rekomendacja została usunięta'
})
except Exception as e:
logger.error(f"Error deleting recommendation {rec_id}: {e}")
db.rollback()
return jsonify({
'success': False,
'error': 'Wystąpił błąd podczas usuwania rekomendacji'
}), 500
finally:
db.close()
# ============================================================
# B2B CLASSIFIEDS ROUTES
# ============================================================
@app.route('/tablica')
@login_required
def classifieds_index():
"""Tablica ogłoszeń B2B"""
listing_type = request.args.get('type', '')
category = request.args.get('category', '')
page = request.args.get('page', 1, type=int)
per_page = 20
db = SessionLocal()
try:
query = db.query(Classified).filter(
Classified.is_active == True
)
# Filtry
if listing_type:
query = query.filter(Classified.listing_type == listing_type)
if category:
query = query.filter(Classified.category == category)
# Sortowanie - najnowsze pierwsze
query = query.order_by(Classified.created_at.desc())
total = query.count()
classifieds = query.limit(per_page).offset((page - 1) * per_page).all()
# Kategorie do filtrów
categories = [
('uslugi', 'Usługi'),
('produkty', 'Produkty'),
('wspolpraca', 'Współpraca'),
('praca', 'Praca'),
('inne', 'Inne')
]
return render_template('classifieds/index.html',
classifieds=classifieds,
categories=categories,
listing_type=listing_type,
category_filter=category,
page=page,
total_pages=(total + per_page - 1) // per_page
)
finally:
db.close()
@app.route('/tablica/nowe', methods=['GET', 'POST'])
@login_required
def classifieds_new():
"""Dodaj nowe ogłoszenie"""
if request.method == 'POST':
listing_type = request.form.get('listing_type', '')
category = request.form.get('category', '')
title = sanitize_input(request.form.get('title', ''), 255)
description = request.form.get('description', '').strip()
budget_info = sanitize_input(request.form.get('budget_info', ''), 255)
location_info = sanitize_input(request.form.get('location_info', ''), 255)
if not listing_type or not category or not title or not description:
flash('Wszystkie wymagane pola muszą być wypełnione.', 'error')
return render_template('classifieds/new.html')
db = SessionLocal()
try:
# Automatyczne wygaśnięcie po 30 dniach
expires = datetime.now() + timedelta(days=30)
classified = Classified(
author_id=current_user.id,
company_id=current_user.company_id,
listing_type=listing_type,
category=category,
title=title,
description=description,
budget_info=budget_info,
location_info=location_info,
expires_at=expires
)
db.add(classified)
db.commit()
flash('Ogłoszenie dodane.', 'success')
return redirect(url_for('classifieds_index'))
finally:
db.close()
return render_template('classifieds/new.html')
@app.route('/tablica/<int:classified_id>')
@login_required
def classifieds_view(classified_id):
"""Szczegóły ogłoszenia"""
db = SessionLocal()
try:
classified = db.query(Classified).filter(
Classified.id == classified_id
).first()
if not classified:
flash('Ogłoszenie nie istnieje.', 'error')
return redirect(url_for('classifieds_index'))
# Zwiększ licznik wyświetleń
classified.views_count += 1
db.commit()
return render_template('classifieds/view.html', classified=classified)
finally:
db.close()
@app.route('/tablica/<int:classified_id>/zakoncz', methods=['POST'])
@login_required
def classifieds_close(classified_id):
"""Zamknij ogłoszenie"""
db = SessionLocal()
try:
classified = db.query(Classified).filter(
Classified.id == classified_id,
Classified.author_id == current_user.id
).first()
if not classified:
return jsonify({'success': False, 'error': 'Ogłoszenie nie istnieje lub brak uprawnień'}), 404
classified.is_active = False
db.commit()
return jsonify({'success': True, 'message': 'Ogłoszenie zamknięte'})
finally:
db.close()
# ============================================================
# NEW MEMBERS ROUTE
# ============================================================
@app.route('/nowi-czlonkowie')
@login_required
def new_members():
"""Lista nowych firm członkowskich"""
days = request.args.get('days', 90, type=int)
db = SessionLocal()
try:
cutoff_date = datetime.now() - timedelta(days=days)
new_companies = db.query(Company).filter(
Company.status == 'active',
Company.created_at >= cutoff_date
).order_by(Company.created_at.desc()).all()
return render_template('new_members.html',
companies=new_companies,
days=days,
total=len(new_companies)
)
finally:
db.close()
# ============================================================
# AUTHENTICATION ROUTES
# ============================================================
@app.route('/register', methods=['GET', 'POST'])
@limiter.limit("5 per hour") # Limit registration attempts
def register():
"""User registration"""
if current_user.is_authenticated:
return redirect(url_for('index'))
if request.method == 'POST':
email = sanitize_input(request.form.get('email', ''), 255)
password = request.form.get('password', '')
name = sanitize_input(request.form.get('name', ''), 255)
company_nip = sanitize_input(request.form.get('company_nip', ''), 10)
# Validate email
if not validate_email(email):
flash('Nieprawidłowy format adresu email.', 'error')
return render_template('auth/register.html')
# Validate password
password_valid, password_message = validate_password(password)
if not password_valid:
flash(password_message, 'error')
return render_template('auth/register.html')
# Validate required fields
if not name or not email or not company_nip:
flash('Imię, email i NIP firmy są wymagane.', 'error')
return render_template('auth/register.html')
# Validate NIP format
if not re.match(r'^\d{10}$', company_nip):
flash('NIP musi składać się z 10 cyfr.', 'error')
return render_template('auth/register.html')
db = SessionLocal()
try:
# Check if user exists
if db.query(User).filter_by(email=email).first():
flash('Email już jest zarejestrowany.', 'error')
return render_template('auth/register.html')
# Check if company is NORDA member
is_norda_member = False
company_id = None
if company_nip and re.match(r'^\d{10}$', company_nip):
company = db.query(Company).filter_by(nip=company_nip, status='active').first()
if company:
is_norda_member = True
company_id = company.id
# Generate verification token
verification_token = secrets.token_urlsafe(32)
verification_expires = datetime.now() + timedelta(hours=24)
# Create user
user = User(
email=email,
password_hash=generate_password_hash(password, method='pbkdf2:sha256'),
name=name,
company_nip=company_nip,
company_id=company_id,
is_norda_member=is_norda_member,
created_at=datetime.now(),
is_active=True,
is_verified=False, # Requires email verification
verification_token=verification_token,
verification_token_expires=verification_expires
)
db.add(user)
db.commit()
# Build verification URL
base_url = os.getenv('APP_URL', 'https://nordabiznes.pl')
verification_url = f"{base_url}/verify-email/{verification_token}"
# Try to send verification email
try:
import email_service
if email_service.is_configured():
success = email_service.send_welcome_email(email, name, verification_url)
if success:
logger.info(f"Verification email sent to {email}")
else:
logger.warning(f"Failed to send verification email to {email}")
logger.info(f"Verification URL (email failed): {verification_url}")
else:
logger.warning("Email service not configured")
logger.info(f"Verification URL (no email service): {verification_url}")
except Exception as e:
logger.error(f"Error sending verification email: {e}")
logger.info(f"Verification URL (exception): {verification_url}")
logger.info(f"New user registered: {email}")
flash('Rejestracja udana! Sprawdz email i kliknij link weryfikacyjny.', 'success')
return redirect(url_for('login'))
except Exception as e:
logger.error(f"Registration error: {e}")
flash('Wystąpił błąd podczas rejestracji. Spróbuj ponownie.', 'error')
return render_template('auth/register.html')
finally:
db.close()
return render_template('auth/register.html')
@app.route('/login', methods=['GET', 'POST'])
@limiter.limit("100 per hour") # Increased for testing
def login():
"""User login"""
if current_user.is_authenticated:
return redirect(url_for('index'))
if request.method == 'POST':
email = sanitize_input(request.form.get('email', ''), 255)
password = request.form.get('password', '')
remember = request.form.get('remember', False) == 'on'
# Basic validation
if not email or not password:
flash('Email i hasło są wymagane.', 'error')
return render_template('auth/login.html')
db = SessionLocal()
try:
user = db.query(User).filter_by(email=email).first()
if not user or not check_password_hash(user.password_hash, password):
logger.warning(f"Failed login attempt for: {email}")
flash('Nieprawidłowy email lub hasło.', 'error')
return render_template('auth/login.html')
if not user.is_active:
flash('Konto zostało dezaktywowane.', 'error')
return render_template('auth/login.html')
# Require email verification
if not user.is_verified:
flash('Musisz potwierdzic adres email przed zalogowaniem. Sprawdz skrzynke.', 'error')
return render_template('auth/login.html')
login_user(user, remember=remember)
user.last_login = datetime.now()
db.commit()
logger.info(f"User logged in: {email}")
next_page = request.args.get('next')
# Prevent open redirect vulnerability
if next_page and not next_page.startswith('/'):
next_page = None
return redirect(next_page or url_for('dashboard'))
except Exception as e:
logger.error(f"Login error: {e}")
flash('Wystąpił błąd podczas logowania. Spróbuj ponownie.', 'error')
return render_template('auth/login.html')
finally:
db.close()
return render_template('auth/login.html')
@app.route('/logout')
@login_required
def logout():
"""User logout"""
logout_user()
flash('Wylogowano pomyślnie.', 'success')
return redirect(url_for('index'))
@app.route('/forgot-password', methods=['GET', 'POST'])
@limiter.limit("5 per hour")
def forgot_password():
"""Request password reset"""
if current_user.is_authenticated:
return redirect(url_for('index'))
if request.method == 'POST':
email = sanitize_input(request.form.get('email', ''), 255)
if not validate_email(email):
flash('Nieprawidłowy format adresu email.', 'error')
return render_template('auth/forgot_password.html')
db = SessionLocal()
try:
user = db.query(User).filter_by(email=email, is_active=True).first()
if user:
# Generate reset token
reset_token = secrets.token_urlsafe(32)
reset_expires = datetime.now() + timedelta(hours=1)
# Save token to database
user.reset_token = reset_token
user.reset_token_expires = reset_expires
db.commit()
# Build reset URL
base_url = os.getenv('APP_URL', 'https://nordabiznes.pl')
reset_url = f"{base_url}/reset-password/{reset_token}"
# Try to send email
try:
import email_service
if email_service.is_configured():
success = email_service.send_password_reset_email(email, reset_url)
if success:
logger.info(f"Password reset email sent to {email}")
else:
logger.warning(f"Failed to send password reset email to {email}")
# Log URL for manual recovery
logger.info(f"Reset URL (email failed): {reset_url}")
else:
logger.warning("Email service not configured")
logger.info(f"Reset URL (no email service): {reset_url}")
except Exception as e:
logger.error(f"Error sending reset email: {e}")
logger.info(f"Reset URL (exception): {reset_url}")
# Always show same message to prevent email enumeration
flash('Jeśli email istnieje w systemie, instrukcje resetowania hasła zostały wysłane.', 'info')
return redirect(url_for('login'))
except Exception as e:
logger.error(f"Password reset error: {e}")
flash('Wystąpił błąd. Spróbuj ponownie.', 'error')
finally:
db.close()
return render_template('auth/forgot_password.html')
@app.route('/reset-password/<token>', methods=['GET', 'POST'])
@limiter.limit("10 per hour")
def reset_password(token):
"""Reset password with token"""
if current_user.is_authenticated:
return redirect(url_for('index'))
db = SessionLocal()
try:
# Find user with valid token
user = db.query(User).filter(
User.reset_token == token,
User.reset_token_expires > datetime.now(),
User.is_active == True
).first()
if not user:
flash('Link resetowania hasła jest nieprawidłowy lub wygasł.', 'error')
return redirect(url_for('forgot_password'))
if request.method == 'POST':
password = request.form.get('password', '')
password_confirm = request.form.get('password_confirm', '')
# Validate passwords match
if password != password_confirm:
flash('Hasła nie są identyczne.', 'error')
return render_template('auth/reset_password.html', token=token)
# Validate password strength
password_valid, password_message = validate_password(password)
if not password_valid:
flash(password_message, 'error')
return render_template('auth/reset_password.html', token=token)
# Update password and clear reset token
user.password_hash = generate_password_hash(password, method='pbkdf2:sha256')
user.reset_token = None
user.reset_token_expires = None
db.commit()
logger.info(f"Password reset successful for {user.email}")
flash('Hasło zostało zmienione. Możesz się teraz zalogować.', 'success')
return redirect(url_for('login'))
return render_template('auth/reset_password.html', token=token)
except Exception as e:
logger.error(f"Reset password error: {e}")
flash('Wystąpił błąd. Spróbuj ponownie.', 'error')
return redirect(url_for('forgot_password'))
finally:
db.close()
@app.route('/verify-email/<token>')
def verify_email(token):
"""Verify email address with token"""
db = SessionLocal()
try:
user = db.query(User).filter(
User.verification_token == token,
User.verification_token_expires > datetime.now(),
User.is_active == True
).first()
if not user:
flash('Link weryfikacyjny jest nieprawidłowy lub wygasł.', 'error')
return redirect(url_for('login'))
if user.is_verified:
flash('Email został już zweryfikowany.', 'info')
return redirect(url_for('login'))
# Verify user
user.is_verified = True
user.verified_at = datetime.now()
user.verification_token = None
user.verification_token_expires = None
db.commit()
logger.info(f"Email verified for {user.email}")
flash('Email został zweryfikowany! Możesz się teraz zalogować.', 'success')
return redirect(url_for('login'))
except Exception as e:
logger.error(f"Email verification error: {e}")
flash('Wystąpił błąd podczas weryfikacji.', 'error')
return redirect(url_for('login'))
finally:
db.close()
@app.route('/resend-verification', methods=['GET', 'POST'])
@limiter.limit("5 per hour")
def resend_verification():
"""Resend email verification link"""
if current_user.is_authenticated:
return redirect(url_for('index'))
if request.method == 'POST':
email = sanitize_input(request.form.get('email', ''), 255)
if not validate_email(email):
flash('Nieprawidłowy format adresu email.', 'error')
return render_template('auth/resend_verification.html')
db = SessionLocal()
try:
user = db.query(User).filter_by(email=email, is_active=True).first()
if user and not user.is_verified:
# Generate new verification token
verification_token = secrets.token_urlsafe(32)
verification_expires = datetime.now() + timedelta(hours=24)
# Update user token
user.verification_token = verification_token
user.verification_token_expires = verification_expires
db.commit()
# Build verification URL
base_url = os.getenv('APP_URL', 'https://nordabiznes.pl')
verification_url = f"{base_url}/verify-email/{verification_token}"
# Try to send email
try:
import email_service
if email_service.is_configured():
success = email_service.send_welcome_email(email, user.name, verification_url)
if success:
logger.info(f"Verification email resent to {email}")
else:
logger.warning(f"Failed to resend verification email to {email}")
logger.info(f"Verification URL (email failed): {verification_url}")
else:
logger.warning("Email service not configured")
logger.info(f"Verification URL (no email service): {verification_url}")
except Exception as e:
logger.error(f"Error resending verification email: {e}")
logger.info(f"Verification URL (exception): {verification_url}")
# Always show same message to prevent email enumeration
flash('Jesli konto istnieje i nie zostalo zweryfikowane, email weryfikacyjny zostal wyslany.', 'info')
return redirect(url_for('login'))
except Exception as e:
logger.error(f"Resend verification error: {e}")
flash('Wystapil blad. Sprobuj ponownie.', 'error')
finally:
db.close()
return render_template('auth/resend_verification.html')
# ============================================================
# USER DASHBOARD
# ============================================================
@app.route('/dashboard')
@login_required
def dashboard():
"""User dashboard"""
db = SessionLocal()
try:
# Get user's conversations
conversations = db.query(AIChatConversation).filter_by(
user_id=current_user.id
).order_by(AIChatConversation.updated_at.desc()).limit(10).all()
# Stats
total_conversations = db.query(AIChatConversation).filter_by(user_id=current_user.id).count()
total_messages = db.query(AIChatMessage).join(AIChatConversation).filter(
AIChatConversation.user_id == current_user.id
).count()
return render_template(
'dashboard.html',
conversations=conversations,
total_conversations=total_conversations,
total_messages=total_messages
)
finally:
db.close()
# ============================================================
# AI CHAT ROUTES
# ============================================================
@app.route('/chat')
@login_required
def chat():
"""AI Chat interface"""
return render_template('chat.html')
@app.route('/api/chat/start', methods=['POST'])
@login_required
def chat_start():
"""Start new chat conversation"""
try:
data = request.get_json()
title = data.get('title', f"Rozmowa - {datetime.now().strftime('%Y-%m-%d %H:%M')}")
chat_engine = NordaBizChatEngine()
conversation = chat_engine.start_conversation(
user_id=current_user.id,
title=title
)
return jsonify({
'success': True,
'conversation_id': conversation.id,
'title': conversation.title
})
except Exception as e:
logger.error(f"Error starting chat: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/chat/<int:conversation_id>/message', methods=['POST'])
@login_required
def chat_send_message(conversation_id):
"""Send message to AI chat"""
try:
data = request.get_json()
message = data.get('message', '').strip()
if not message:
return jsonify({'success': False, 'error': 'Wiadomość nie może być pusta'}), 400
# Verify conversation belongs to user
db = SessionLocal()
try:
conversation = db.query(AIChatConversation).filter_by(
id=conversation_id,
user_id=current_user.id
).first()
if not conversation:
return jsonify({'success': False, 'error': 'Conversation not found'}), 404
finally:
db.close()
chat_engine = NordaBizChatEngine()
response = chat_engine.send_message(
conversation_id=conversation_id,
user_message=message,
user_id=current_user.id
)
# Get free tier usage stats for today
free_tier_stats = get_free_tier_usage()
# Calculate theoretical cost (Gemini 2.0 Flash pricing)
tokens_in = response.tokens_input or 0
tokens_out = response.tokens_output or 0
theoretical_cost = (tokens_in / 1_000_000) * 0.075 + (tokens_out / 1_000_000) * 0.30
return jsonify({
'success': True,
'message': response.content,
'message_id': response.id,
'created_at': response.created_at.isoformat(),
# Technical metadata
'tech_info': {
'model': 'gemini-2.0-flash',
'data_source': 'PostgreSQL (80 firm Norda Biznes)',
'architecture': 'Full DB Context (wszystkie firmy w kontekście AI)',
'tokens_input': tokens_in,
'tokens_output': tokens_out,
'tokens_total': tokens_in + tokens_out,
'latency_ms': response.latency_ms or 0,
'theoretical_cost_usd': round(theoretical_cost, 6),
'actual_cost_usd': 0.0, # Free tier
'free_tier': {
'is_free': True,
'daily_limit': 1500, # Gemini free tier: 1500 req/day
'requests_today': free_tier_stats['requests_today'],
'tokens_today': free_tier_stats['tokens_today'],
'remaining': max(0, 1500 - free_tier_stats['requests_today'])
}
}
})
except Exception as e:
logger.error(f"Error sending message: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/chat/<int:conversation_id>/history', methods=['GET'])
@login_required
def chat_get_history(conversation_id):
"""Get conversation history"""
try:
# Verify conversation belongs to user
db = SessionLocal()
try:
conversation = db.query(AIChatConversation).filter_by(
id=conversation_id,
user_id=current_user.id
).first()
if not conversation:
return jsonify({'success': False, 'error': 'Conversation not found'}), 404
finally:
db.close()
chat_engine = NordaBizChatEngine()
history = chat_engine.get_conversation_history(conversation_id)
return jsonify({
'success': True,
'messages': history
})
except Exception as e:
logger.error(f"Error getting history: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
# ============================================================
# API ROUTES (for frontend)
# ============================================================
@app.route('/api/companies')
def api_companies():
"""API: Get all companies"""
db = SessionLocal()
try:
companies = db.query(Company).filter_by(status='active').all()
return jsonify({
'success': True,
'companies': [
{
'id': c.id,
'name': c.name,
'category': c.category.name if c.category else None,
'description': c.description_short,
'website': c.website,
'phone': c.phone,
'email': c.email
}
for c in companies
]
})
finally:
db.close()
def _build_seo_audit_response(company, analysis):
"""
Helper function to build SEO audit response JSON.
Used by both /api/seo/audit and /api/seo/audit/<slug> endpoints.
"""
# Build issues list from various checks
issues = []
# Check for images without alt
if analysis.images_without_alt and analysis.images_without_alt > 0:
issues.append({
'severity': 'warning',
'message': f'{analysis.images_without_alt} obrazów nie ma atrybutu alt',
'category': 'accessibility'
})
# Check for missing meta description
if not analysis.meta_description:
issues.append({
'severity': 'warning',
'message': 'Brak meta description',
'category': 'on_page'
})
# Check H1 count (should be exactly 1)
if analysis.h1_count is not None:
if analysis.h1_count == 0:
issues.append({
'severity': 'error',
'message': 'Brak nagłówka H1 na stronie',
'category': 'on_page'
})
elif analysis.h1_count > 1:
issues.append({
'severity': 'warning',
'message': f'Strona zawiera {analysis.h1_count} nagłówków H1 (zalecany: 1)',
'category': 'on_page'
})
# Check SSL
if analysis.has_ssl is False:
issues.append({
'severity': 'error',
'message': 'Strona nie używa HTTPS (brak certyfikatu SSL)',
'category': 'security'
})
# Check robots.txt
if analysis.has_robots_txt is False:
issues.append({
'severity': 'info',
'message': 'Brak pliku robots.txt',
'category': 'technical'
})
# Check sitemap
if analysis.has_sitemap is False:
issues.append({
'severity': 'info',
'message': 'Brak pliku sitemap.xml',
'category': 'technical'
})
# Check indexability
if analysis.is_indexable is False:
issues.append({
'severity': 'error',
'message': f'Strona nie jest indeksowalna: {analysis.noindex_reason or "nieznana przyczyna"}',
'category': 'technical'
})
# Check structured data
if analysis.has_structured_data is False:
issues.append({
'severity': 'info',
'message': 'Brak danych strukturalnych (Schema.org)',
'category': 'on_page'
})
# Check Open Graph tags
if analysis.has_og_tags is False:
issues.append({
'severity': 'info',
'message': 'Brak tagów Open Graph (ważne dla udostępniania w social media)',
'category': 'social'
})
# Check mobile-friendliness
if analysis.is_mobile_friendly is False:
issues.append({
'severity': 'warning',
'message': 'Strona nie jest przyjazna dla urządzeń mobilnych',
'category': 'technical'
})
# Add issues from seo_issues JSONB field if available
if analysis.seo_issues:
stored_issues = analysis.seo_issues if isinstance(analysis.seo_issues, list) else []
for issue in stored_issues:
if isinstance(issue, dict):
issues.append(issue)
# Build response
return {
'success': True,
'company_id': company.id,
'company_name': company.name,
'website': company.website,
'seo_audit': {
'audited_at': analysis.seo_audited_at.isoformat() if analysis.seo_audited_at else None,
'audit_version': analysis.seo_audit_version,
'overall_score': analysis.seo_overall_score,
'pagespeed': {
'seo_score': analysis.pagespeed_seo_score,
'performance_score': analysis.pagespeed_performance_score,
'accessibility_score': analysis.pagespeed_accessibility_score,
'best_practices_score': analysis.pagespeed_best_practices_score
},
'on_page': {
'meta_title': analysis.meta_title,
'meta_description': analysis.meta_description,
'h1_count': analysis.h1_count,
'h1_text': analysis.h1_text,
'h2_count': analysis.h2_count,
'h3_count': analysis.h3_count,
'total_images': analysis.total_images,
'images_without_alt': analysis.images_without_alt,
'images_with_alt': analysis.images_with_alt,
'internal_links_count': analysis.internal_links_count,
'external_links_count': analysis.external_links_count,
'has_structured_data': analysis.has_structured_data,
'structured_data_types': analysis.structured_data_types
},
'technical': {
'has_ssl': analysis.has_ssl,
'ssl_issuer': analysis.ssl_issuer,
'ssl_expires_at': analysis.ssl_expires_at.isoformat() if analysis.ssl_expires_at else None,
'has_sitemap': analysis.has_sitemap,
'has_robots_txt': analysis.has_robots_txt,
'has_canonical': analysis.has_canonical,
'canonical_url': analysis.canonical_url,
'is_indexable': analysis.is_indexable,
'noindex_reason': analysis.noindex_reason,
'is_mobile_friendly': analysis.is_mobile_friendly,
'viewport_configured': analysis.viewport_configured,
'load_time_ms': analysis.load_time_ms,
'http_status_code': analysis.http_status_code
},
'core_web_vitals': {
'largest_contentful_paint_ms': analysis.largest_contentful_paint_ms,
'first_input_delay_ms': analysis.first_input_delay_ms,
'cumulative_layout_shift': float(analysis.cumulative_layout_shift) if analysis.cumulative_layout_shift else None
},
'social': {
'has_og_tags': analysis.has_og_tags,
'og_title': analysis.og_title,
'og_description': analysis.og_description,
'og_image': analysis.og_image,
'has_twitter_cards': analysis.has_twitter_cards
},
'language': {
'html_lang': analysis.html_lang,
'has_hreflang': analysis.has_hreflang
},
'issues': issues
}
}
def _get_seo_audit_for_company(db, company):
"""
Helper function to get SEO audit data for a company.
Returns tuple of (response_dict, status_code) or (None, None) if audit exists.
"""
# Get latest SEO audit for this company
analysis = db.query(CompanyWebsiteAnalysis).filter_by(
company_id=company.id
).order_by(CompanyWebsiteAnalysis.analyzed_at.desc()).first()
if not analysis:
return {
'success': True,
'company_id': company.id,
'company_name': company.name,
'website': company.website,
'seo_audit': None,
'message': 'Brak danych SEO dla tej firmy. Audyt nie został jeszcze przeprowadzony.'
}, 200
# Check if SEO audit was performed (seo_audited_at is set)
if not analysis.seo_audited_at:
return {
'success': True,
'company_id': company.id,
'company_name': company.name,
'website': company.website,
'seo_audit': None,
'message': 'Audyt SEO nie został jeszcze przeprowadzony dla tej firmy.'
}, 200
# Build full response
return _build_seo_audit_response(company, analysis), 200
@app.route('/api/seo/audit')
def api_seo_audit():
"""
API: Get SEO audit results for a company.
Query parameters:
- company_id: Company ID (integer)
- slug: Company slug (string)
At least one of company_id or slug must be provided.
Returns JSON with:
- pagespeed scores (seo, performance, accessibility, best_practices)
- on_page metrics (meta tags, headings, images, links, structured data)
- technical checks (ssl, sitemap, robots.txt, mobile-friendly)
- issues list with severity levels
"""
company_id = request.args.get('company_id', type=int)
slug = request.args.get('slug', type=str)
if not company_id and not slug:
return jsonify({
'success': False,
'error': 'Podaj company_id lub slug firmy'
}), 400
db = SessionLocal()
try:
# Find company by ID or slug
if company_id:
company = db.query(Company).filter_by(id=company_id, status='active').first()
else:
company = db.query(Company).filter_by(slug=slug, status='active').first()
if not company:
return jsonify({
'success': False,
'error': 'Firma nie znaleziona'
}), 404
response, status_code = _get_seo_audit_for_company(db, company)
return jsonify(response), status_code
finally:
db.close()
@app.route('/api/seo/audit/<slug>')
def api_seo_audit_by_slug(slug):
"""
API: Get SEO audit results for a company by slug.
Convenience endpoint that uses slug from URL path.
Example: GET /api/seo/audit/pixlab-sp-z-o-o
"""
db = SessionLocal()
try:
# Find company by slug
company = db.query(Company).filter_by(slug=slug, status='active').first()
if not company:
return jsonify({
'success': False,
'error': 'Firma nie znaleziona'
}), 404
response, status_code = _get_seo_audit_for_company(db, company)
return jsonify(response), status_code
finally:
db.close()
@app.route('/api/seo/audit', methods=['POST'])
@login_required
@limiter.limit("10 per hour")
def api_seo_audit_trigger():
"""
API: Trigger SEO audit for a company (admin-only).
This endpoint runs a full SEO audit including:
- Google PageSpeed Insights analysis
- On-page SEO analysis (meta tags, headings, images, links)
- Technical SEO checks (robots.txt, sitemap, canonical URLs)
Request JSON body:
- company_id: Company ID (integer) OR
- slug: Company slug (string)
Returns:
- Success: Full SEO audit results saved to database
- Error: Error message with status code
Rate limited to 10 requests per hour per user to prevent API abuse.
"""
# Admin-only check
if not current_user.is_admin:
return jsonify({
'success': False,
'error': 'Brak uprawnień. Tylko administrator może uruchamiać audyty SEO.'
}), 403
# Check if SEO audit service is available
if not SEO_AUDIT_AVAILABLE:
return jsonify({
'success': False,
'error': 'Usługa audytu SEO jest niedostępna. Sprawdź konfigurację serwera.'
}), 503
# Parse request data
data = request.get_json()
if not data:
return jsonify({
'success': False,
'error': 'Brak danych w żądaniu. Podaj company_id lub slug.'
}), 400
company_id = data.get('company_id')
slug = data.get('slug')
if not company_id and not slug:
return jsonify({
'success': False,
'error': 'Podaj company_id lub slug firmy do audytu.'
}), 400
db = SessionLocal()
try:
# Find company by ID or slug
if company_id:
company = db.query(Company).filter_by(id=company_id, status='active').first()
else:
company = db.query(Company).filter_by(slug=slug, status='active').first()
if not company:
return jsonify({
'success': False,
'error': 'Firma nie znaleziona lub nieaktywna.'
}), 404
# Check if company has a website
if not company.website:
return jsonify({
'success': False,
'error': f'Firma "{company.name}" nie ma zdefiniowanej strony internetowej.',
'company_id': company.id,
'company_name': company.name
}), 400
logger.info(f"SEO audit triggered by admin {current_user.email} for company: {company.name} (ID: {company.id})")
# Initialize SEO auditor and run audit
try:
auditor = SEOAuditor()
# Prepare company dict for auditor
company_dict = {
'id': company.id,
'name': company.name,
'slug': company.slug,
'website': company.website,
'address_city': company.address_city
}
# Run the audit
audit_result = auditor.audit_company(company_dict)
# Check for errors
if audit_result.get('errors') and not audit_result.get('onpage') and not audit_result.get('pagespeed'):
return jsonify({
'success': False,
'error': f'Audyt nie powiódł się: {", ".join(audit_result["errors"])}',
'company_id': company.id,
'company_name': company.name,
'website': company.website
}), 422
# Save result to database
saved = auditor.save_audit_result(audit_result)
if not saved:
return jsonify({
'success': False,
'error': 'Audyt został wykonany, ale nie udało się zapisać wyników do bazy danych.',
'company_id': company.id,
'company_name': company.name
}), 500
# Get the updated analysis record to return
db.expire_all() # Refresh the session to get updated data
analysis = db.query(CompanyWebsiteAnalysis).filter_by(
company_id=company.id
).order_by(CompanyWebsiteAnalysis.analyzed_at.desc()).first()
# Build response using the existing helper function
response = _build_seo_audit_response(company, analysis)
return jsonify({
'success': True,
'message': f'Audyt SEO dla firmy "{company.name}" został zakończony pomyślnie.',
'audit_version': SEO_AUDIT_VERSION,
'triggered_by': current_user.email,
'triggered_at': datetime.now().isoformat(),
**response
}), 200
except Exception as e:
logger.error(f"SEO audit error for company {company.id}: {e}")
return jsonify({
'success': False,
'error': f'Błąd podczas wykonywania audytu: {str(e)}',
'company_id': company.id,
'company_name': company.name
}), 500
finally:
db.close()
# ============================================================
# SEO ADMIN DASHBOARD
# ============================================================
@app.route('/admin/seo')
@login_required
def admin_seo():
"""
Admin dashboard for SEO metrics overview.
Displays:
- Summary stats (score distribution, average score)
- Sortable table of all companies with SEO scores
- Color-coded score badges (green 90-100, yellow 50-89, red 0-49)
- Filtering by category, score range, and search text
- Last audit date with staleness indicator
- Actions: view profile, trigger single company audit
Query Parameters:
- company: Slug of company to highlight/filter (optional)
"""
if not current_user.is_admin:
flash('Brak uprawnień do tej strony.', 'error')
return redirect(url_for('dashboard'))
# Get optional company filter from URL
filter_company_slug = request.args.get('company', '')
db = SessionLocal()
try:
from sqlalchemy import func
# Get all active companies with their latest SEO analysis data
# Using outerjoin to include companies without SEO data
companies_query = db.query(
Company.id,
Company.name,
Company.slug,
Company.website,
Category.name.label('category_name'),
CompanyWebsiteAnalysis.pagespeed_seo_score,
CompanyWebsiteAnalysis.pagespeed_performance_score,
CompanyWebsiteAnalysis.pagespeed_accessibility_score,
CompanyWebsiteAnalysis.pagespeed_best_practices_score,
CompanyWebsiteAnalysis.seo_audited_at
).outerjoin(
Category,
Company.category_id == Category.id
).outerjoin(
CompanyWebsiteAnalysis,
Company.id == CompanyWebsiteAnalysis.company_id
).filter(
Company.status == 'active'
).order_by(
Company.name
).all()
# Build companies list with named attributes for template
companies = []
for row in companies_query:
companies.append({
'id': row.id,
'name': row.name,
'slug': row.slug,
'website': row.website,
'category': row.category_name,
'seo_score': row.pagespeed_seo_score,
'performance_score': row.pagespeed_performance_score,
'accessibility_score': row.pagespeed_accessibility_score,
'best_practices_score': row.pagespeed_best_practices_score,
'seo_audited_at': row.seo_audited_at
})
# Calculate statistics
audited_companies = [c for c in companies if c['seo_score'] is not None]
not_audited = [c for c in companies if c['seo_score'] is None]
good_count = len([c for c in audited_companies if c['seo_score'] >= 90])
medium_count = len([c for c in audited_companies if 50 <= c['seo_score'] < 90])
poor_count = len([c for c in audited_companies if c['seo_score'] < 50])
not_audited_count = len(not_audited)
# Calculate average score (only for audited companies)
if audited_companies:
avg_score = round(sum(c['seo_score'] for c in audited_companies) / len(audited_companies))
else:
avg_score = None
stats = {
'good_count': good_count,
'medium_count': medium_count,
'poor_count': poor_count,
'not_audited_count': not_audited_count,
'avg_score': avg_score
}
# Get unique categories for filter dropdown
categories = sorted(set(c['category'] for c in companies if c['category']))
# Convert companies list to objects with attribute access for template
class CompanyRow:
def __init__(self, data):
for key, value in data.items():
setattr(self, key, value)
companies_objects = [CompanyRow(c) for c in companies]
return render_template('admin_seo_dashboard.html',
companies=companies_objects,
stats=stats,
categories=categories,
now=datetime.now(),
filter_company=filter_company_slug
)
finally:
db.close()
# ============================================================
# GBP (GOOGLE BUSINESS PROFILE) AUDIT API
# ============================================================
@app.route('/api/gbp/audit/health')
def api_gbp_audit_health():
"""
API: Health check for GBP audit service.
Returns service status and version information.
Used by monitoring systems to verify service availability.
"""
if GBP_AUDIT_AVAILABLE:
return jsonify({
'status': 'ok',
'service': 'gbp_audit',
'version': GBP_AUDIT_VERSION,
'available': True
}), 200
else:
return jsonify({
'status': 'unavailable',
'service': 'gbp_audit',
'available': False,
'error': 'GBP audit service not loaded'
}), 503
@app.route('/api/gbp/audit', methods=['GET'])
def api_gbp_audit_get():
"""
API: Get GBP audit results for a company.
Query parameters:
- company_id: Company ID (integer) OR
- slug: Company slug (string)
Returns:
- Latest audit results with completeness score and recommendations
- 404 if company not found
- 404 if no audit exists for the company
Example: GET /api/gbp/audit?company_id=26
Example: GET /api/gbp/audit?slug=pixlab-sp-z-o-o
"""
if not GBP_AUDIT_AVAILABLE:
return jsonify({
'success': False,
'error': 'Usługa audytu GBP jest niedostępna.'
}), 503
company_id = request.args.get('company_id', type=int)
slug = request.args.get('slug')
if not company_id and not slug:
return jsonify({
'success': False,
'error': 'Podaj company_id lub slug firmy.'
}), 400
db = SessionLocal()
try:
# Find company
if company_id:
company = db.query(Company).filter_by(id=company_id, status='active').first()
else:
company = db.query(Company).filter_by(slug=slug, status='active').first()
if not company:
return jsonify({
'success': False,
'error': 'Firma nie znaleziona lub nieaktywna.'
}), 404
# Get latest audit
audit = gbp_get_company_audit(db, company.id)
if not audit:
return jsonify({
'success': False,
'error': f'Brak wyników audytu GBP dla firmy "{company.name}". Uruchom audyt używając POST /api/gbp/audit.',
'company_id': company.id,
'company_name': company.name
}), 404
# Build response
return jsonify({
'success': True,
'company_id': company.id,
'company_name': company.name,
'company_slug': company.slug,
'audit': {
'id': audit.id,
'audit_date': audit.audit_date.isoformat() if audit.audit_date else None,
'completeness_score': audit.completeness_score,
'score_category': audit.score_category,
'fields_status': audit.fields_status,
'recommendations': audit.recommendations,
'has_name': audit.has_name,
'has_address': audit.has_address,
'has_phone': audit.has_phone,
'has_website': audit.has_website,
'has_hours': audit.has_hours,
'has_categories': audit.has_categories,
'has_photos': audit.has_photos,
'has_description': audit.has_description,
'has_services': audit.has_services,
'has_reviews': audit.has_reviews,
'photo_count': audit.photo_count,
'review_count': audit.review_count,
'average_rating': float(audit.average_rating) if audit.average_rating else None,
'google_place_id': audit.google_place_id,
'audit_source': audit.audit_source,
'audit_version': audit.audit_version
}
}), 200
except Exception as e:
logger.error(f"Error fetching GBP audit: {e}")
return jsonify({
'success': False,
'error': f'Błąd podczas pobierania audytu: {str(e)}'
}), 500
finally:
db.close()
@app.route('/api/gbp/audit/<slug>')
def api_gbp_audit_by_slug(slug):
"""
API: Get GBP audit results for a company by slug.
Convenience endpoint that uses slug from URL path.
Example: GET /api/gbp/audit/pixlab-sp-z-o-o
"""
if not GBP_AUDIT_AVAILABLE:
return jsonify({
'success': False,
'error': 'Usługa audytu GBP jest niedostępna.'
}), 503
db = SessionLocal()
try:
company = db.query(Company).filter_by(slug=slug, status='active').first()
if not company:
return jsonify({
'success': False,
'error': f'Firma o slug "{slug}" nie znaleziona.'
}), 404
audit = gbp_get_company_audit(db, company.id)
if not audit:
return jsonify({
'success': False,
'error': f'Brak wyników audytu GBP dla firmy "{company.name}".',
'company_id': company.id,
'company_name': company.name
}), 404
return jsonify({
'success': True,
'company_id': company.id,
'company_name': company.name,
'company_slug': company.slug,
'audit': {
'id': audit.id,
'audit_date': audit.audit_date.isoformat() if audit.audit_date else None,
'completeness_score': audit.completeness_score,
'score_category': audit.score_category,
'fields_status': audit.fields_status,
'recommendations': audit.recommendations,
'photo_count': audit.photo_count,
'review_count': audit.review_count,
'average_rating': float(audit.average_rating) if audit.average_rating else None
}
}), 200
finally:
db.close()
@app.route('/api/gbp/audit', methods=['POST'])
@login_required
@limiter.limit("20 per hour")
def api_gbp_audit_trigger():
"""
API: Run GBP audit for a company.
This endpoint runs a completeness audit for Google Business Profile data,
checking fields like name, address, phone, website, hours, categories,
photos, description, services, and reviews.
Request JSON body:
- company_id: Company ID (integer) OR
- slug: Company slug (string)
- save: Whether to save results to database (default: true)
Returns:
- Success: Audit results with completeness score and recommendations
- Error: Error message with status code
Access:
- Members can audit their own company
- Admins can audit any company
Rate limited to 20 requests per hour per user.
"""
if not GBP_AUDIT_AVAILABLE:
return jsonify({
'success': False,
'error': 'Usługa audytu GBP jest niedostępna. Sprawdź konfigurację serwera.'
}), 503
# Parse request data
data = request.get_json()
if not data:
return jsonify({
'success': False,
'error': 'Brak danych w żądaniu. Podaj company_id lub slug.'
}), 400
company_id = data.get('company_id')
slug = data.get('slug')
save_result = data.get('save', True)
if not company_id and not slug:
return jsonify({
'success': False,
'error': 'Podaj company_id lub slug firmy do audytu.'
}), 400
db = SessionLocal()
try:
# Find company by ID or slug
if company_id:
company = db.query(Company).filter_by(id=company_id, status='active').first()
else:
company = db.query(Company).filter_by(slug=slug, status='active').first()
if not company:
return jsonify({
'success': False,
'error': 'Firma nie znaleziona lub nieaktywna.'
}), 404
# Check access: admin can audit any company, member only their own
if not current_user.is_admin:
# Check if user is associated with this company
if current_user.company_id != company.id:
return jsonify({
'success': False,
'error': 'Brak uprawnień. Możesz audytować tylko własną firmę.'
}), 403
logger.info(f"GBP audit triggered by {current_user.email} for company: {company.name} (ID: {company.id})")
# Option to fetch fresh Google data before audit
fetch_google = data.get('fetch_google', True)
force_refresh = data.get('force_refresh', False)
try:
# Step 1: Fetch fresh Google Business data (if enabled)
fetch_result = None
if fetch_google:
logger.info(f"Fetching Google Business data for company {company.id}...")
fetch_result = gbp_fetch_google_data(db, company.id, force_refresh=force_refresh)
if not fetch_result.get('success') and not fetch_result.get('data', {}).get('cached'):
# Log warning but continue with audit
logger.warning(f"Google fetch warning for company {company.id}: {fetch_result.get('error')}")
# Step 2: Run the audit
result = gbp_audit_company(db, company.id, save=save_result)
# Build field status for response
fields_response = {}
for field_name, field_status in result.fields.items():
fields_response[field_name] = {
'status': field_status.status,
'value': str(field_status.value) if field_status.value is not None else None,
'score': field_status.score,
'max_score': field_status.max_score,
'recommendation': field_status.recommendation
}
# Determine score category
score = result.completeness_score
if score >= 90:
score_category = 'excellent'
elif score >= 70:
score_category = 'good'
elif score >= 50:
score_category = 'needs_work'
else:
score_category = 'poor'
response_data = {
'success': True,
'message': f'Audyt GBP dla firmy "{company.name}" został zakończony pomyślnie.',
'company_id': company.id,
'company_name': company.name,
'company_slug': company.slug,
'audit_version': GBP_AUDIT_VERSION,
'triggered_by': current_user.email,
'triggered_at': datetime.now().isoformat(),
'saved': save_result,
'audit': {
'completeness_score': result.completeness_score,
'score_category': score_category,
'fields_status': fields_response,
'recommendations': result.recommendations,
'photo_count': result.photo_count,
'logo_present': result.logo_present,
'cover_photo_present': result.cover_photo_present,
'review_count': result.review_count,
'average_rating': float(result.average_rating) if result.average_rating else None,
'google_place_id': result.google_place_id
}
}
# Include Google fetch results if performed
if fetch_result:
response_data['google_fetch'] = {
'success': fetch_result.get('success', False),
'steps': fetch_result.get('steps', []),
'data': fetch_result.get('data', {}),
'error': fetch_result.get('error')
}
return jsonify(response_data), 200
except ValueError as e:
return jsonify({
'success': False,
'error': str(e),
'company_id': company.id if company else None
}), 400
except Exception as e:
logger.error(f"GBP audit error for company {company.id}: {e}")
return jsonify({
'success': False,
'error': f'Błąd podczas wykonywania audytu: {str(e)}',
'company_id': company.id,
'company_name': company.name
}), 500
finally:
db.close()
# ============================================================
# SEO AUDIT USER-FACING DASHBOARD
# ============================================================
@app.route('/audit/seo/<slug>')
@login_required
def seo_audit_dashboard(slug):
"""
User-facing SEO audit dashboard for a specific company.
Displays SEO audit results with:
- PageSpeed Insights scores (SEO, Performance, Accessibility, Best Practices)
- Website analysis data
- Improvement recommendations
Access control:
- Admin users can view audit for any company
- Regular users can only view audit for their own company
Args:
slug: Company slug identifier
Returns:
Rendered seo_audit.html template with company and audit data
"""
db = SessionLocal()
try:
# Find company by slug
company = db.query(Company).filter_by(slug=slug, status='active').first()
if not company:
flash('Firma nie została znaleziona.', 'error')
return redirect(url_for('dashboard'))
# Access control: admin can view any company, member only their own
if not current_user.is_admin:
if current_user.company_id != company.id:
flash('Brak uprawnień. Możesz przeglądać audyt tylko własnej firmy.', 'error')
return redirect(url_for('dashboard'))
# Get latest SEO analysis for this company
analysis = db.query(CompanyWebsiteAnalysis).filter(
CompanyWebsiteAnalysis.company_id == company.id
).order_by(CompanyWebsiteAnalysis.seo_audited_at.desc()).first()
# Build SEO data dict if analysis exists
seo_data = None
if analysis and analysis.seo_audited_at:
seo_data = {
'seo_score': analysis.pagespeed_seo_score,
'performance_score': analysis.pagespeed_performance_score,
'accessibility_score': analysis.pagespeed_accessibility_score,
'best_practices_score': analysis.pagespeed_best_practices_score,
'audited_at': analysis.seo_audited_at,
'audit_version': analysis.seo_audit_version,
'url': analysis.website_url
}
# Determine if user can run audit (admin or company owner)
can_audit = current_user.is_admin or current_user.company_id == company.id
logger.info(f"SEO audit dashboard viewed by {current_user.email} for company: {company.name}")
return render_template('seo_audit.html',
company=company,
seo_data=seo_data,
can_audit=can_audit
)
finally:
db.close()
# ============================================================
# SOCIAL MEDIA AUDIT USER-FACING DASHBOARD
# ============================================================
@app.route('/audit/social/<slug>')
@login_required
def social_audit_dashboard(slug):
"""
User-facing Social Media audit dashboard for a specific company.
Displays social media presence audit with:
- Overall presence score (platforms found / total platforms)
- Platform-by-platform status
- Profile validation status
- Recommendations for missing platforms
Access control:
- Admins: Can view all companies
- Regular users: Can only view their own company
Args:
slug: Company URL slug
Returns:
Rendered social_audit.html template with company and social data
"""
db = SessionLocal()
try:
# Find company by slug
company = db.query(Company).filter_by(slug=slug, status='active').first()
if not company:
flash('Firma nie została znaleziona.', 'error')
return redirect(url_for('dashboard'))
# Access control - admin can view all, users only their company
if not current_user.is_admin:
if current_user.company_id != company.id:
flash('Brak uprawnień do wyświetlenia audytu social media tej firmy.', 'error')
return redirect(url_for('dashboard'))
# Get social media profiles for this company
social_profiles = db.query(CompanySocialMedia).filter(
CompanySocialMedia.company_id == company.id
).all()
# Define all platforms we track
all_platforms = ['facebook', 'instagram', 'linkedin', 'youtube', 'twitter', 'tiktok']
# Build social media data
profiles_dict = {}
for profile in social_profiles:
profiles_dict[profile.platform] = {
'url': profile.url,
'is_valid': profile.is_valid,
'check_status': profile.check_status,
'page_name': profile.page_name,
'followers_count': profile.followers_count,
'verified_at': profile.verified_at,
'last_checked_at': profile.last_checked_at
}
# Calculate score (platforms with profiles / total platforms)
platforms_with_profiles = len([p for p in all_platforms if p in profiles_dict])
total_platforms = len(all_platforms)
score = int((platforms_with_profiles / total_platforms) * 100) if total_platforms > 0 else 0
social_data = {
'profiles': profiles_dict,
'all_platforms': all_platforms,
'platforms_count': platforms_with_profiles,
'total_platforms': total_platforms,
'score': score
}
# Determine if user can run audit (admin or company owner)
can_audit = current_user.is_admin or current_user.company_id == company.id
logger.info(f"Social Media audit dashboard viewed by {current_user.email} for company: {company.name}")
return render_template('social_audit.html',
company=company,
social_data=social_data,
can_audit=can_audit
)
finally:
db.close()
@app.route('/api/social/audit', methods=['POST'])
@login_required
@limiter.limit("10 per hour")
def api_social_audit_trigger():
"""
API: Trigger Social Media audit for a company.
This endpoint verifies social media profile URLs and updates their status.
It checks if URLs are valid and accessible.
Request JSON body:
- company_id: Company ID (integer) OR
- slug: Company slug (string)
Returns:
- Success: Updated social media audit results
- Error: Error message with status code
Rate limited to 10 requests per hour per user.
"""
import requests as http_requests
# Admin or company owner check
data = request.get_json()
if not data:
return jsonify({
'success': False,
'error': 'Brak danych w żądaniu. Podaj company_id lub slug.'
}), 400
company_id = data.get('company_id')
slug = data.get('slug')
if not company_id and not slug:
return jsonify({
'success': False,
'error': 'Podaj company_id lub slug firmy do audytu.'
}), 400
db = SessionLocal()
try:
# Find company by ID or slug
if company_id:
company = db.query(Company).filter_by(id=company_id, status='active').first()
else:
company = db.query(Company).filter_by(slug=slug, status='active').first()
if not company:
return jsonify({
'success': False,
'error': 'Firma nie znaleziona lub nieaktywna.'
}), 404
# Access control - admin can audit all, users only their company
if not current_user.is_admin:
if current_user.company_id != company.id:
return jsonify({
'success': False,
'error': 'Brak uprawnień do audytu social media tej firmy.'
}), 403
logger.info(f"Social Media audit triggered by {current_user.email} for company: {company.name} (ID: {company.id})")
# Get existing social media profiles
profiles = db.query(CompanySocialMedia).filter(
CompanySocialMedia.company_id == company.id
).all()
verified_count = 0
errors = []
# Verify each profile URL
for profile in profiles:
try:
response = http_requests.head(
profile.url,
timeout=10,
allow_redirects=True,
headers={'User-Agent': 'Mozilla/5.0 (compatible; NordaBizBot/1.0)'}
)
if response.status_code == 200:
profile.is_valid = True
profile.check_status = 'ok'
verified_count += 1
elif response.status_code in [301, 302, 303, 307, 308]:
profile.is_valid = True
profile.check_status = 'redirect'
verified_count += 1
elif response.status_code == 404:
profile.is_valid = False
profile.check_status = '404'
else:
profile.is_valid = True # Assume valid if we get any response
profile.check_status = f'http_{response.status_code}'
verified_count += 1
profile.last_checked_at = datetime.now()
except http_requests.exceptions.Timeout:
profile.check_status = 'timeout'
profile.last_checked_at = datetime.now()
errors.append(f'{profile.platform}: timeout')
except http_requests.exceptions.ConnectionError:
profile.check_status = 'connection_error'
profile.last_checked_at = datetime.now()
errors.append(f'{profile.platform}: connection error')
except Exception as e:
profile.check_status = 'error'
profile.last_checked_at = datetime.now()
errors.append(f'{profile.platform}: {str(e)[:50]}')
db.commit()
# Calculate score
all_platforms = ['facebook', 'instagram', 'linkedin', 'youtube', 'twitter', 'tiktok']
profiles_dict = {p.platform: p for p in profiles}
platforms_with_profiles = len([p for p in all_platforms if p in profiles_dict])
score = int((platforms_with_profiles / len(all_platforms)) * 100)
return jsonify({
'success': True,
'message': f'Audyt Social Media zakończony. Zweryfikowano {verified_count} z {len(profiles)} profili.',
'company_id': company.id,
'company_name': company.name,
'profiles_count': len(profiles),
'verified_count': verified_count,
'score': score,
'errors': errors if errors else None
}), 200
except Exception as e:
logger.error(f"Social Media audit error for company {slug or company_id}: {e}")
db.rollback()
return jsonify({
'success': False,
'error': f'Błąd podczas audytu: {str(e)}'
}), 500
finally:
db.close()
# ============================================================
# GBP AUDIT USER-FACING DASHBOARD
# ============================================================
@app.route('/audit/gbp/<slug>')
@login_required
def gbp_audit_dashboard(slug):
"""
User-facing GBP audit dashboard for a specific company.
Displays Google Business Profile completeness audit results with:
- Overall completeness score (0-100)
- Field-by-field status breakdown
- AI-generated improvement recommendations
- Historical audit data
Access control:
- Admin users can view audit for any company
- Regular users can only view audit for their own company
Args:
slug: Company slug identifier
Returns:
Rendered gbp_audit.html template with company and audit data
"""
if not GBP_AUDIT_AVAILABLE:
flash('Usługa audytu Google Business Profile jest tymczasowo niedostępna.', 'error')
return redirect(url_for('dashboard'))
db = SessionLocal()
try:
# Find company by slug
company = db.query(Company).filter_by(slug=slug, status='active').first()
if not company:
flash('Firma nie została znaleziona.', 'error')
return redirect(url_for('dashboard'))
# Access control: admin can view any company, member only their own
if not current_user.is_admin:
if current_user.company_id != company.id:
flash('Brak uprawnień. Możesz przeglądać audyt tylko własnej firmy.', 'error')
return redirect(url_for('dashboard'))
# Get latest audit for this company
audit = gbp_get_company_audit(db, company.id)
# If no audit exists, we still render the page (template handles this)
# The user can trigger an audit from the dashboard
# Determine if user can run audit (admin or company owner)
can_audit = current_user.is_admin or current_user.company_id == company.id
logger.info(f"GBP audit dashboard viewed by {current_user.email} for company: {company.name}")
return render_template('gbp_audit.html',
company=company,
audit=audit,
can_audit=can_audit,
gbp_audit_available=GBP_AUDIT_AVAILABLE,
gbp_audit_version=GBP_AUDIT_VERSION
)
finally:
db.close()
@app.route('/api/check-email', methods=['POST'])
def api_check_email():
"""API: Check if email is available"""
data = request.get_json()
email = data.get('email', '').strip().lower()
# Validate email format
if not email or not validate_email(email):
return jsonify({
'available': False,
'error': 'Nieprawidłowy format email'
}), 400
db = SessionLocal()
try:
# Check if email exists
existing_user = db.query(User).filter_by(email=email).first()
return jsonify({
'available': existing_user is None,
'email': email
})
finally:
db.close()
@app.route('/api/verify-nip', methods=['POST'])
def api_verify_nip():
"""API: Verify NIP and check if company is NORDA member"""
data = request.get_json()
nip = data.get('nip', '').strip()
# Validate NIP format
if not nip or not re.match(r'^\d{10}$', nip):
return jsonify({
'success': False,
'error': 'Nieprawidłowy format NIP'
}), 400
db = SessionLocal()
try:
# Check if NIP exists in companies database
company = db.query(Company).filter_by(nip=nip, status='active').first()
if company:
return jsonify({
'success': True,
'is_member': True,
'company_name': company.name,
'company_id': company.id
})
else:
return jsonify({
'success': True,
'is_member': False,
'company_name': None,
'company_id': None
})
finally:
db.close()
@app.route('/api/verify-krs', methods=['GET', 'POST'])
def api_verify_krs():
"""
API: Verify company data from KRS Open API (prs.ms.gov.pl).
GET /api/verify-krs?krs=0000817317
POST /api/verify-krs with JSON body: {"krs": "0000817317"}
Returns official KRS data including:
- Company name, NIP, REGON
- Address
- Capital
- Registration date
- Management board (anonymized in Open API)
- Shareholders (anonymized in Open API)
"""
# Get KRS from query params (GET) or JSON body (POST)
if request.method == 'GET':
krs = request.args.get('krs', '').strip()
else:
data = request.get_json(silent=True) or {}
krs = data.get('krs', '').strip()
# Validate KRS format (7-10 digits)
if not krs or not re.match(r'^\d{7,10}$', krs):
return jsonify({
'success': False,
'error': 'Nieprawidłowy format KRS (wymagane 7-10 cyfr)'
}), 400
# Normalize to 10 digits
krs_normalized = krs.zfill(10)
try:
# Fetch data from KRS Open API
krs_data = krs_api_service.get_company_from_krs(krs_normalized)
if krs_data is None:
return jsonify({
'success': False,
'error': f'Nie znaleziono podmiotu o KRS {krs_normalized} w rejestrze',
'krs': krs_normalized
}), 404
# Check if company exists in our database
db = SessionLocal()
try:
our_company = db.query(Company).filter_by(krs=krs_normalized).first()
is_member = our_company is not None
company_id = our_company.id if our_company else None
finally:
db.close()
return jsonify({
'success': True,
'krs': krs_normalized,
'is_norda_member': is_member,
'company_id': company_id,
'data': krs_data.to_dict(),
'formatted_address': krs_api_service.format_address(krs_data),
'source': 'KRS Open API (prs.ms.gov.pl)',
'note': 'Dane osobowe (imiona, nazwiska) są zanonimizowane w Open API'
})
except Exception as e:
return jsonify({
'success': False,
'error': f'Błąd podczas pobierania danych z KRS: {str(e)}'
}), 500
@app.route('/api/company/<int:company_id>/refresh-krs', methods=['POST'])
@login_required
def api_refresh_company_krs(company_id):
"""
API: Refresh company data from KRS Open API.
Updates company record with official KRS data.
Requires login.
"""
db = SessionLocal()
try:
company = db.query(Company).filter_by(id=company_id).first()
if not company:
return jsonify({
'success': False,
'error': 'Firma nie znaleziona'
}), 404
if not company.krs:
return jsonify({
'success': False,
'error': 'Firma nie ma numeru KRS'
}), 400
# Fetch data from KRS
krs_data = krs_api_service.get_company_from_krs(company.krs)
if krs_data is None:
return jsonify({
'success': False,
'error': f'Nie znaleziono podmiotu o KRS {company.krs} w rejestrze'
}), 404
# Update company data (only non-personal data)
updates = {}
if krs_data.nip and krs_data.nip != company.nip:
updates['nip'] = krs_data.nip
company.nip = krs_data.nip
if krs_data.regon:
regon_9 = krs_data.regon[:9]
if regon_9 != company.regon:
updates['regon'] = regon_9
company.regon = regon_9
# Update address if significantly different
new_address = krs_api_service.format_address(krs_data)
if new_address and new_address != company.address:
updates['address'] = new_address
company.address = new_address
if krs_data.miejscowosc and krs_data.miejscowosc != company.city:
updates['city'] = krs_data.miejscowosc
company.city = krs_data.miejscowosc
if krs_data.kapital_zakladowy:
updates['kapital_zakladowy'] = krs_data.kapital_zakladowy
# Note: Might need to add this field to Company model
# Update verification timestamp
company.krs_verified_at = datetime.utcnow()
db.commit()
return jsonify({
'success': True,
'company_id': company_id,
'updates': updates,
'krs_data': krs_data.to_dict(),
'message': f'Zaktualizowano {len(updates)} pól' if updates else 'Dane są aktualne'
})
except Exception as e:
db.rollback()
return jsonify({
'success': False,
'error': f'Błąd podczas aktualizacji: {str(e)}'
}), 500
finally:
db.close()
@app.route('/api/model-info', methods=['GET'])
def api_model_info():
"""API: Get current AI model information"""
service = gemini_service.get_gemini_service()
if service:
return jsonify({
'success': True,
'model': service.model_name,
'provider': 'Google Gemini'
})
else:
return jsonify({
'success': False,
'error': 'AI service not initialized'
}), 500
# ============================================================
# AI CHAT FEEDBACK & ANALYTICS
# ============================================================
@app.route('/api/chat/feedback', methods=['POST'])
@login_required
def chat_feedback():
"""API: Submit feedback for AI response"""
try:
data = request.get_json()
message_id = data.get('message_id')
rating = data.get('rating') # 1 = thumbs down, 2 = thumbs up
if not message_id or rating not in [1, 2]:
return jsonify({'success': False, 'error': 'Invalid data'}), 400
db = SessionLocal()
try:
# Verify message exists and belongs to user's conversation
message = db.query(AIChatMessage).filter_by(id=message_id).first()
if not message:
return jsonify({'success': False, 'error': 'Message not found'}), 404
conversation = db.query(AIChatConversation).filter_by(
id=message.conversation_id,
user_id=current_user.id
).first()
if not conversation:
return jsonify({'success': False, 'error': 'Not authorized'}), 403
# Update message feedback
message.feedback_rating = rating
message.feedback_at = datetime.now()
message.feedback_comment = data.get('comment', '')
# Create detailed feedback record if provided
if data.get('is_helpful') is not None or data.get('comment'):
existing_feedback = db.query(AIChatFeedback).filter_by(message_id=message_id).first()
if existing_feedback:
existing_feedback.rating = rating
existing_feedback.is_helpful = data.get('is_helpful')
existing_feedback.is_accurate = data.get('is_accurate')
existing_feedback.found_company = data.get('found_company')
existing_feedback.comment = data.get('comment')
else:
feedback = AIChatFeedback(
message_id=message_id,
user_id=current_user.id,
rating=rating,
is_helpful=data.get('is_helpful'),
is_accurate=data.get('is_accurate'),
found_company=data.get('found_company'),
comment=data.get('comment'),
original_query=data.get('original_query'),
expected_companies=data.get('expected_companies')
)
db.add(feedback)
db.commit()
logger.info(f"Feedback received: message_id={message_id}, rating={rating}")
return jsonify({'success': True})
finally:
db.close()
except Exception as e:
logger.error(f"Error saving feedback: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/admin/chat-analytics')
@login_required
def chat_analytics():
"""Admin dashboard for chat analytics"""
# Only admins can access
if not current_user.is_admin:
flash('Brak uprawnień do tej strony.', 'error')
return redirect(url_for('dashboard'))
db = SessionLocal()
try:
from sqlalchemy import func, desc
# Basic stats
total_conversations = db.query(AIChatConversation).count()
total_messages = db.query(AIChatMessage).count()
total_user_messages = db.query(AIChatMessage).filter_by(role='user').count()
# Feedback stats
feedback_count = db.query(AIChatMessage).filter(AIChatMessage.feedback_rating.isnot(None)).count()
positive_feedback = db.query(AIChatMessage).filter_by(feedback_rating=2).count()
negative_feedback = db.query(AIChatMessage).filter_by(feedback_rating=1).count()
# Recent conversations with feedback
recent_feedback = db.query(AIChatMessage).filter(
AIChatMessage.feedback_rating.isnot(None)
).order_by(desc(AIChatMessage.feedback_at)).limit(20).all()
# Popular queries (user messages)
recent_queries = db.query(AIChatMessage).filter_by(role='user').order_by(
desc(AIChatMessage.created_at)
).limit(50).all()
# Calculate satisfaction rate
satisfaction_rate = (positive_feedback / feedback_count * 100) if feedback_count > 0 else 0
return render_template(
'admin/chat_analytics.html',
total_conversations=total_conversations,
total_messages=total_messages,
total_user_messages=total_user_messages,
feedback_count=feedback_count,
positive_feedback=positive_feedback,
negative_feedback=negative_feedback,
satisfaction_rate=round(satisfaction_rate, 1),
recent_feedback=recent_feedback,
recent_queries=recent_queries
)
finally:
db.close()
@app.route('/api/admin/chat-stats')
@login_required
def api_chat_stats():
"""API: Get chat statistics for dashboard"""
if not current_user.is_admin:
return jsonify({'success': False, 'error': 'Not authorized'}), 403
db = SessionLocal()
try:
from sqlalchemy import func, desc
from datetime import timedelta
# Stats for last 7 days
week_ago = datetime.now() - timedelta(days=7)
daily_stats = db.query(
func.date(AIChatMessage.created_at).label('date'),
func.count(AIChatMessage.id).label('count')
).filter(
AIChatMessage.created_at >= week_ago,
AIChatMessage.role == 'user'
).group_by(
func.date(AIChatMessage.created_at)
).order_by('date').all()
return jsonify({
'success': True,
'daily_queries': [{'date': str(d.date), 'count': d.count} for d in daily_stats]
})
finally:
db.close()
# ============================================================
# DEBUG PANEL (Admin only)
# ============================================================
@app.route('/admin/debug')
@login_required
def debug_panel():
"""Real-time debug panel for monitoring app activity"""
if not current_user.is_admin:
flash('Brak uprawnień do tej strony.', 'error')
return redirect(url_for('dashboard'))
return render_template('admin/debug.html')
@app.route('/api/admin/logs')
@login_required
def api_get_logs():
"""API: Get recent logs"""
if not current_user.is_admin:
return jsonify({'success': False, 'error': 'Not authorized'}), 403
# Get optional filters
level = request.args.get('level', '') # DEBUG, INFO, WARNING, ERROR
since = request.args.get('since', '') # ISO timestamp
limit = min(int(request.args.get('limit', 100)), 500)
logs = list(debug_handler.logs)
# Filter by level
if level:
logs = [l for l in logs if l['level'] == level.upper()]
# Filter by timestamp
if since:
logs = [l for l in logs if l['timestamp'] > since]
# Return most recent
logs = logs[-limit:]
return jsonify({
'success': True,
'logs': logs,
'total': len(debug_handler.logs)
})
@app.route('/api/admin/logs/stream')
@login_required
def api_logs_stream():
"""SSE endpoint for real-time log streaming"""
if not current_user.is_admin:
return jsonify({'success': False, 'error': 'Not authorized'}), 403
def generate():
last_count = 0
while True:
current_count = len(debug_handler.logs)
if current_count > last_count:
# Send new logs
new_logs = list(debug_handler.logs)[last_count:]
for log in new_logs:
yield f"data: {json.dumps(log)}\n\n"
last_count = current_count
import time
time.sleep(0.5)
return Response(generate(), mimetype='text/event-stream')
@app.route('/api/admin/logs/clear', methods=['POST'])
@login_required
def api_clear_logs():
"""API: Clear log buffer"""
if not current_user.is_admin:
return jsonify({'success': False, 'error': 'Not authorized'}), 403
debug_handler.logs.clear()
logger.info("Log buffer cleared by admin")
return jsonify({'success': True})
@app.route('/api/admin/test-log', methods=['POST'])
@login_required
def api_test_log():
"""API: Generate test log entries"""
if not current_user.is_admin:
return jsonify({'success': False, 'error': 'Not authorized'}), 403
logger.debug("Test DEBUG message")
logger.info("Test INFO message")
logger.warning("Test WARNING message")
logger.error("Test ERROR message")
return jsonify({'success': True, 'message': 'Test logs generated'})
@app.route('/admin/digital-maturity')
@login_required
def digital_maturity_dashboard():
"""Admin dashboard for digital maturity assessment results"""
if not current_user.is_admin:
flash('Brak uprawnień do tej strony.', 'error')
return redirect(url_for('dashboard'))
db = SessionLocal()
try:
from sqlalchemy import func, desc
# Get all companies with maturity data
companies_query = db.query(
Company.id,
Company.name,
Company.slug,
Company.website,
CompanyDigitalMaturity.overall_score,
CompanyDigitalMaturity.online_presence_score,
CompanyDigitalMaturity.sales_readiness,
CompanyDigitalMaturity.total_opportunity_value,
CompanyWebsiteAnalysis.opportunity_score,
CompanyWebsiteAnalysis.has_blog,
CompanyWebsiteAnalysis.has_portfolio,
CompanyWebsiteAnalysis.has_contact_form,
CompanyWebsiteAnalysis.content_richness_score,
CompanyDigitalMaturity.critical_gaps,
CompanyWebsiteAnalysis.missing_features
).join(
CompanyDigitalMaturity, Company.id == CompanyDigitalMaturity.company_id
).join(
CompanyWebsiteAnalysis, Company.id == CompanyWebsiteAnalysis.company_id
).filter(
CompanyDigitalMaturity.overall_score > 0
).order_by(
desc(CompanyDigitalMaturity.overall_score)
).all()
# Calculate stats
total_analyzed = len(companies_query)
avg_score = round(sum(c.overall_score for c in companies_query) / total_analyzed, 1) if total_analyzed else 0
total_opportunity = sum(float(c.total_opportunity_value or 0) for c in companies_query)
warm_leads = [c for c in companies_query if c.sales_readiness == 'warm']
cold_leads = [c for c in companies_query if c.sales_readiness == 'cold']
# Top 10 and bottom 10
top_performers = companies_query[:10]
bottom_performers = sorted(companies_query, key=lambda c: c.overall_score)[:10]
# Top opportunities
top_opportunities = sorted(
companies_query,
key=lambda c: float(c.total_opportunity_value or 0),
reverse=True
)[:10]
return render_template('admin/digital_maturity.html',
total_analyzed=total_analyzed,
avg_score=avg_score,
total_opportunity=total_opportunity,
warm_leads_count=len(warm_leads),
cold_leads_count=len(cold_leads),
top_performers=top_performers,
bottom_performers=bottom_performers,
top_opportunities=top_opportunities,
all_companies=companies_query
)
finally:
db.close()
@app.route('/admin/social-media')
@login_required
def admin_social_media():
"""Admin dashboard for social media analytics"""
if not current_user.is_admin:
flash('Brak uprawnień do tej strony.', 'error')
return redirect(url_for('dashboard'))
db = SessionLocal()
try:
from sqlalchemy import func, case, distinct
from database import CompanySocialMedia
# Total counts per platform
platform_stats = db.query(
CompanySocialMedia.platform,
func.count(CompanySocialMedia.id).label('count'),
func.count(distinct(CompanySocialMedia.company_id)).label('companies')
).filter(
CompanySocialMedia.is_valid == True
).group_by(CompanySocialMedia.platform).all()
# Companies with each platform combination
company_platforms = db.query(
Company.id,
Company.name,
Company.slug,
func.array_agg(distinct(CompanySocialMedia.platform)).label('platforms')
).outerjoin(
CompanySocialMedia,
(Company.id == CompanySocialMedia.company_id) & (CompanySocialMedia.is_valid == True)
).group_by(Company.id, Company.name, Company.slug).all()
# Analysis
total_companies = len(company_platforms)
companies_with_sm = [c for c in company_platforms if c.platforms and c.platforms[0] is not None]
companies_without_sm = [c for c in company_platforms if not c.platforms or c.platforms[0] is None]
# Platform combinations
platform_combos_raw = {}
for c in companies_with_sm:
platforms = sorted([p for p in c.platforms if p]) if c.platforms else []
key = ', '.join(platforms) if platforms else 'Brak'
if key not in platform_combos_raw:
platform_combos_raw[key] = []
platform_combos_raw[key].append({'id': c.id, 'name': c.name, 'slug': c.slug})
# Sort by number of companies (descending)
platform_combos = dict(sorted(platform_combos_raw.items(), key=lambda x: len(x[1]), reverse=True))
# Only Facebook
only_facebook = [c for c in companies_with_sm if set(c.platforms) == {'facebook'}]
# Only LinkedIn
only_linkedin = [c for c in companies_with_sm if set(c.platforms) == {'linkedin'}]
# Only Instagram
only_instagram = [c for c in companies_with_sm if set(c.platforms) == {'instagram'}]
# Has all major (FB + LI + IG)
has_all_major = [c for c in companies_with_sm if {'facebook', 'linkedin', 'instagram'}.issubset(set(c.platforms or []))]
# Get all social media entries with company info for detailed view
all_entries = db.query(
CompanySocialMedia,
Company.name.label('company_name'),
Company.slug.label('company_slug')
).join(Company).order_by(
Company.name, CompanySocialMedia.platform
).all()
# Freshness analysis
from datetime import datetime, timedelta
now = datetime.now()
fresh_30d = db.query(func.count(CompanySocialMedia.id)).filter(
CompanySocialMedia.verified_at >= now - timedelta(days=30)
).scalar()
stale_90d = db.query(func.count(CompanySocialMedia.id)).filter(
CompanySocialMedia.verified_at < now - timedelta(days=90)
).scalar()
return render_template('admin/social_media.html',
platform_stats=platform_stats,
total_companies=total_companies,
companies_with_sm=len(companies_with_sm),
companies_without_sm=companies_without_sm,
platform_combos=platform_combos,
only_facebook=only_facebook,
only_linkedin=only_linkedin,
only_instagram=only_instagram,
has_all_major=has_all_major,
all_entries=all_entries,
fresh_30d=fresh_30d,
stale_90d=stale_90d,
now=now
)
finally:
db.close()
# ============================================================
# ERROR HANDLERS
# ============================================================
@app.errorhandler(404)
def not_found(error):
return render_template('errors/404.html'), 404
@app.errorhandler(500)
def internal_error(error):
return render_template('errors/500.html'), 500
# ============================================================
# MAIN
# ============================================================
if __name__ == '__main__':
port = int(os.getenv('PORT', 5000))
debug = os.getenv('FLASK_ENV') == 'development'
logger.info(f"Starting Norda Biznes Hub on port {port}")
app.run(host='0.0.0.0', port=port, debug=debug)