MilanM commited on
Commit
d80f7bd
·
verified ·
1 Parent(s): 45e96b5

Delete stream_files_to_cos.py

Browse files
Files changed (1) hide show
  1. stream_files_to_cos.py +0 -230
stream_files_to_cos.py DELETED
@@ -1,230 +0,0 @@
1
- def stream_file_to_cos():
2
- # Import dependencies
3
- import ibm_boto3
4
- import requests
5
- from ibm_botocore.client import Config
6
- import json
7
- import os
8
- import re
9
- import io
10
- from urllib.parse import unquote
11
-
12
- def extract_filename_from_headers(response):
13
- """
14
- Extract the actual filename from response headers.
15
- Checks Content-Disposition and falls back to other methods if needed.
16
- Uses mimetypes library for extension mapping.
17
- """
18
- import mimetypes
19
-
20
- # Ensure mimetypes database is initialized with common types
21
- mimetypes.init()
22
-
23
- # Add any missing but common MIME types that might not be in the default database
24
- if not mimetypes.guess_extension('application/x-jsonlines'):
25
- mimetypes.add_type('application/x-jsonlines', '.jsonl')
26
- if not mimetypes.guess_extension('application/parquet'):
27
- mimetypes.add_type('application/parquet', '.parquet')
28
- if not mimetypes.guess_extension('application/x-ipynb+json'):
29
- mimetypes.add_type('application/x-ipynb+json', '.ipynb')
30
- if not mimetypes.guess_extension('application/yaml'):
31
- mimetypes.add_type('application/yaml', '.yaml')
32
- if not mimetypes.guess_extension('text/yaml'):
33
- mimetypes.add_type('text/yaml', '.yaml')
34
- if not mimetypes.guess_extension('application/toml'):
35
- mimetypes.add_type('application/toml', '.toml')
36
-
37
- # Try Content-Disposition header first
38
- content_disposition = response.headers.get('Content-Disposition')
39
- if content_disposition:
40
- # Look for filename= or filename*= parameters
41
- matches = re.findall(r'filename\*?=(?:([^\']*\'\')?([^;\n]*))', content_disposition)
42
- if matches:
43
- # Take the last match and handle encoded filenames
44
- encoding, filename = matches[-1]
45
- if encoding:
46
- filename = unquote(filename)
47
- return filename.strip('"\'')
48
-
49
- # Get the URL path as fallback filename
50
- url_path = response.url.split('/')[-1].split('?')[0]
51
-
52
- # Try Content-Type for file extension
53
- content_type = response.headers.get('Content-Type', '').split(';')[0]
54
- if content_type and '.' not in url_path:
55
- # Get extension from mimetype
56
- extension = mimetypes.guess_extension(content_type)
57
- if extension:
58
- return f"{url_path}{extension}"
59
-
60
- # Fallback to URL filename
61
- return url_path
62
-
63
- def score(payload): ### or def score(payload, token=None) if you want to add authentication
64
- """
65
- WatsonX.ai deployable function to stream files from HTTP to Cloud Object Storage
66
-
67
- Expected simplified format:
68
- [
69
- {
70
- "cos_config": {
71
- "bucket_name": "my-bucket",
72
- "api_key": "my-api-key",
73
- "instance_id": "my-instance-id",
74
- "auth_endpoint": "https://iam.cloud.ibm.com/identity/token",
75
- "endpoint_url": "https://s3.us-south.cloud-object-storage.appdomain.cloud"
76
- },
77
- "source_urls": ["https://example.com/file1.pdf", "https://example.com/file2.csv"],
78
- "prefix": "my/prefix",
79
- "http_method": "GET"
80
- }
81
- ]
82
-
83
- Which you can run through this kind of helper function:
84
- ### --- --- ---
85
-
86
- def reformat_for_wxai_scoring(input_data):
87
- '''Converts input data to WatsonX.ai scoring payload format.'''
88
- # Convert single dict to list
89
- inputs = [input_data] if isinstance(input_data, dict) else input_data
90
-
91
- if not inputs:
92
- return {"input_data": [{"fields": [], "values": [[]]}]}
93
-
94
- # Extract fields from first object
95
- fields = list(inputs[0].keys())
96
-
97
- # Build values array
98
- values = [[obj.get(field, None) for field in fields] for obj in inputs]
99
-
100
- return {"input_data": [{"fields": fields, "values": values}]}
101
-
102
- ### --- --- ---
103
- """
104
- try:
105
- # Extract the actual payload from input_data format
106
- fields = payload["input_data"][0]["fields"]
107
- values = payload["input_data"][0]["values"][0]
108
-
109
- # Create a dictionary from fields and values
110
- params = dict(zip(fields, values))
111
-
112
- # Extract COS configuration
113
- cos_config = params.get('cos_config', {})
114
-
115
- # Verify all required config values are present
116
- required_configs = ['bucket_name', 'api_key', 'instance_id', 'auth_endpoint', 'endpoint_url']
117
- missing_configs = [k for k in required_configs if k not in cos_config or not cos_config[k]]
118
- if missing_configs:
119
- return {
120
- 'predictions': [{
121
- 'fields': ['status', 'message'],
122
- 'values': [['error', f"Missing required configuration: {', '.join(missing_configs)}"]]
123
- }]
124
- }
125
-
126
- # Get function parameters
127
- source_urls = params.get('source_urls', [])
128
- if not source_urls:
129
- return {
130
- 'predictions': [{
131
- 'fields': ['status', 'message'],
132
- 'values': [['error', "Missing required parameter: source_urls"]]
133
- }]
134
- }
135
-
136
- # Convert single URL to list if necessary
137
- if isinstance(source_urls, str):
138
- source_urls = [source_urls]
139
-
140
- prefix = params.get('prefix', '')
141
- http_method = params.get('http_method', 'GET')
142
-
143
- # Initialize COS client
144
- cos_client = ibm_boto3.client(
145
- "s3",
146
- ibm_api_key_id=cos_config['api_key'],
147
- ibm_service_instance_id=cos_config['instance_id'],
148
- ibm_auth_endpoint=cos_config['auth_endpoint'],
149
- config=Config(signature_version="oauth"),
150
- endpoint_url=cos_config['endpoint_url']
151
- )
152
-
153
- # Normalize prefix
154
- if prefix:
155
- prefix = prefix.strip('/')
156
- if prefix:
157
- prefix = f"{prefix}/"
158
-
159
- # Track results for each URL
160
- results = []
161
- errors = []
162
-
163
- for source_url in source_urls:
164
- try:
165
- # Setup download stream
166
- session = requests.Session()
167
- response = session.request(http_method, source_url, stream=True)
168
- response.raise_for_status()
169
- # Extract actual filename from response
170
- filename = extract_filename_from_headers(response)
171
- # Combine prefix with filename for the full COS key
172
- target_key = f"{prefix}{filename}" if prefix else filename
173
-
174
- # Create a BytesIO buffer and write decompressed content
175
- file_buffer = io.BytesIO()
176
- for chunk in response.iter_content(chunk_size=8192):
177
- if chunk:
178
- file_buffer.write(chunk)
179
- file_buffer.seek(0) # Reset to beginning for upload
180
-
181
- # Upload file to COS
182
- conf = ibm_boto3.s3.transfer.TransferConfig(
183
- multipart_threshold=1024**2, max_concurrency=100 # 1MB
184
- )
185
- cos_client.upload_fileobj(
186
- file_buffer, cos_config["bucket_name"], target_key, Config=conf
187
- )
188
-
189
- results.append({
190
- "source_url": source_url,
191
- "bucket": cos_config['bucket_name'],
192
- "key": target_key,
193
- "filename": filename,
194
- "status": "success"
195
- })
196
-
197
- except Exception as e:
198
- errors.append({
199
- "source_url": source_url,
200
- "error": str(e)
201
- })
202
-
203
- # Prepare response in watsonx.ai format
204
- response_data = {
205
- "successful_uploads": results,
206
- "failed_uploads": errors,
207
- "total_processed": len(source_urls),
208
- "successful_count": len(results),
209
- "failed_count": len(errors)
210
- }
211
-
212
- return {
213
- 'predictions': [{
214
- 'fields': ['status', 'data'],
215
- 'values': [['success' if results else 'error', response_data]]
216
- }]
217
- }
218
-
219
- except Exception as e:
220
- return {
221
- 'predictions': [{
222
- 'fields': ['status', 'message'],
223
- 'values': [['error', f"Error processing request: {str(e)}"]]
224
- }]
225
- }
226
-
227
- return score
228
-
229
-
230
- score = stream_file_to_cos()