File size: 9,055 Bytes
9dd30d3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
import json
import sqlite3
from datetime import datetime
from typing import Dict, Optional, List

class UserProfileManager:
    """Manages user profile data and operations"""
    
    def __init__(self, db_path="users.db"):
        self.db_path = db_path
        self.init_database()
    
    def init_database(self):
        """Initialize the user profile database"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                cursor.execute("""

                    CREATE TABLE IF NOT EXISTS user_profiles (

                        id INTEGER PRIMARY KEY AUTOINCREMENT,

                        username VARCHAR(50) UNIQUE NOT NULL,

                        email VARCHAR(100) UNIQUE NOT NULL,

                        first_name VARCHAR(50),

                        last_name VARCHAR(50),

                        profile_data TEXT,

                        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

                        updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP

                    )

                """)
                conn.commit()
        except Exception as e:
            print(f"Database initialization error: {e}")
    
    def create_profile(self, username: str, email: str, first_name: str = "", 

                      last_name: str = "", additional_data: Dict = None) -> bool:
        """Create a new user profile"""
        try:
            profile_data = json.dumps(additional_data or {})
            
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                cursor.execute("""

                    INSERT INTO user_profiles 

                    (username, email, first_name, last_name, profile_data)

                    VALUES (?, ?, ?, ?, ?)

                """, (username, email, first_name, last_name, profile_data))
                conn.commit()
                return True
                
        except sqlite3.IntegrityError:
            return False  # Username or email already exists
        except Exception as e:
            print(f"Profile creation error: {e}")
            return False
    
    def get_profile(self, username: str) -> Optional[Dict]:
        """Retrieve user profile by username"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                cursor.execute("""

                    SELECT id, username, email, first_name, last_name, 

                           profile_data, created_at, updated_at

                    FROM user_profiles 

                    WHERE username = ?

                """, (username,))
                
                row = cursor.fetchone()
                if row:
                    return {
                        'id': row[0],
                        'username': row[1],
                        'email': row[2],
                        'first_name': row[3],
                        'last_name': row[4],
                        'profile_data': json.loads(row[5] or '{}'),
                        'created_at': row[6],
                        'updated_at': row[7]
                    }
                return None
                
        except Exception as e:
            print(f"Profile retrieval error: {e}")
            return None
    
    def update_profile(self, username: str, updates: Dict) -> bool:
        """Update user profile information"""
        try:
            # Build dynamic update query
            update_fields = []
            update_values = []
            
            allowed_fields = ['email', 'first_name', 'last_name', 'profile_data']
            
            for field, value in updates.items():
                if field in allowed_fields:
                    if field == 'profile_data':
                        value = json.dumps(value)
                    update_fields.append(f"{field} = ?")
                    update_values.append(value)
            
            if not update_fields:
                return False  # No valid fields to update
            
            # Add updated_at timestamp
            update_fields.append("updated_at = ?")
            update_values.append(datetime.now().isoformat())
            
            # Add username for WHERE clause
            update_values.append(username)
            
            query = f"""

                UPDATE user_profiles 

                SET {', '.join(update_fields)}

                WHERE username = ?

            """
            
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                cursor.execute(query, update_values)
                conn.commit()
                
                return cursor.rowcount > 0  # True if row was updated
                
        except sqlite3.IntegrityError:
            return False  # Constraint violation (e.g., duplicate email)
        except Exception as e:
            print(f"Profile update error: {e}")
            return False
    
    def delete_profile(self, username: str) -> bool:
        """Delete user profile"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                cursor.execute("DELETE FROM user_profiles WHERE username = ?", (username,))
                conn.commit()
                return cursor.rowcount > 0
                
        except Exception as e:
            print(f"Profile deletion error: {e}")
            return False
    
    def list_profiles(self, limit: int = 50, offset: int = 0) -> List[Dict]:
        """List user profiles with pagination"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                cursor.execute("""

                    SELECT id, username, email, first_name, last_name, created_at

                    FROM user_profiles 

                    ORDER BY created_at DESC

                    LIMIT ? OFFSET ?

                """, (limit, offset))
                
                profiles = []
                for row in cursor.fetchall():
                    profiles.append({
                        'id': row[0],
                        'username': row[1],
                        'email': row[2],
                        'first_name': row[3],
                        'last_name': row[4],
                        'created_at': row[5]
                    })
                
                return profiles
                
        except Exception as e:
            print(f"Profile listing error: {e}")
            return []
    
    def search_profiles(self, search_term: str) -> List[Dict]:
        """Search profiles by username, email, or name"""
        try:
            search_pattern = f"%{search_term}%"
            
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                cursor.execute("""

                    SELECT id, username, email, first_name, last_name, created_at

                    FROM user_profiles 

                    WHERE username LIKE ? OR email LIKE ? 

                       OR first_name LIKE ? OR last_name LIKE ?

                    ORDER BY username

                """, (search_pattern, search_pattern, search_pattern, search_pattern))
                
                profiles = []
                for row in cursor.fetchall():
                    profiles.append({
                        'id': row[0],
                        'username': row[1],
                        'email': row[2],
                        'first_name': row[3],
                        'last_name': row[4],
                        'created_at': row[5]
                    })
                
                return profiles
                
        except Exception as e:
            print(f"Profile search error: {e}")
            return []

# API endpoints for profile management
def handle_profile_update_api(request_data: Dict) -> Dict:
    """Handle API request for profile updates"""
    try:
        username = request_data.get('username')
        updates = request_data.get('updates', {})
        
        if not username:
            return {'error': 'Username is required', 'status': 400}
        
        profile_manager = UserProfileManager()
        
        # Check if profile exists
        if not profile_manager.get_profile(username):
            return {'error': 'Profile not found', 'status': 404}
        
        # Attempt to update profile
        if profile_manager.update_profile(username, updates):
            return {'message': 'Profile updated successfully', 'status': 200}
        else:
            return {'error': 'Failed to update profile', 'status': 500}
            
    except Exception as e:
        return {'error': f'Internal server error: {str(e)}', 'status': 500}