hadadrjt's picture
CLI: Initial.
4a6a19d
#!/usr/bin/env python3
#
# SPDX-FileCopyrightText: Hadad <[email protected]>
# SPDX-License-Identifier: Apache-2.0
#
# UltimaX Intelligence CLI
# 0.0.1 version
#
import sys
import argparse
try:
import pollinations # Provider.
except ImportError:
print("Module 'pollinations' not found. Install it with: pip install pollinations pollinations.ai", file=sys.stderr)
sys.exit(2) # Exit to signal missing dependency.
def response(output): # Print non-stream responses.
try:
if hasattr(output, "text"): # Handle objects with .text.
print(output.text) # Print text and return.
return
if isinstance(output, dict) and "text" in output: # Handle dicts with 'text'.
print(output["text"]) # Print dict value and return.
return
if isinstance(output, (list, tuple)): # Handle sequences of parts.
try:
print("".join(map(str, output))) # Join parts and return.
return
except Exception:
pass # Fall through to fallback.
except Exception:
pass # Fall through on any inspection error.
print(str(output)) # Fallback printing.
def main(): # Script entry point.
parser = argparse.ArgumentParser(description="UltimaX Intelligence")
parser.add_argument("text", nargs="*", help="Helper for the user input.") # Positional text.
parser.add_argument("--no-stream", dest="stream", action="store_false", help="Use non-streaming mode.")
args = parser.parse_args() # Parse flags and positionals.
if not args.text: # No positional text given.
if sys.stdin.isatty(): # Nothing piped in.
print("Empty input. Request cancelled.", file=sys.stderr) # Immediate cancel on empty invocation.
sys.exit(1) # Exit to indicate no work done.
input = sys.stdin.read() # Read piped stdin.
elif len(args.text) == 1 and args.text[0] == "-": # Explicit stdin sentinel.
if sys.stdin.isatty(): # No data to read.
print("Empty input. Request cancelled.", file=sys.stderr) # Cancel when '-' without pipe.
sys.exit(1) # Exit to indicate no work done.
input = sys.stdin.read() # Read piped stdin.
else:
input = " ".join(args.text) # Join positional pieces.
# Reject empty or whitespace-only input.
if not input or input.strip() == "": # Guard against blanks.
print("Empty input. Request cancelled.", file=sys.stderr) # Consistent message.
sys.exit(1) # Exit to indicate no work done.
# Build model instance.
try:
model = pollinations.Text() # OpenAI: GPT-4.1 (Nano).
except Exception:
print("Failed to construct default model.", file=sys.stderr) # High level failure.
import traceback # Lazy import for error details.
traceback.print_exc() # Show stack trace for debugging.
sys.exit(1) # Exit due to setup failure.
try:
if args.stream: # Streaming path.
for token in model(input, stream=True): # Iterate tokens.
try:
if isinstance(token, (str, bytes)): # Strings or bytes.
print(token.decode() if isinstance(token, bytes) else token, end="", flush=True) # Emit token.
elif isinstance(token, dict) and "text" in token: # Dict tokens.
print(token["text"], end="", flush=True) # Emit dict text.
elif hasattr(token, "text"): # Object tokens with .text.
print(token.text, end="", flush=True) # Emit attribute.
else:
print(str(token), end="", flush=True) # Emit generic.
except Exception:
print(str(token), end="", flush=True) # Fallback on token error.
print() # Newline after stream completes.
else: # Non-streaming path.
output = model(input, stream=False) # Get full response.
response(output) # Print.
except KeyboardInterrupt:
print("\nInterrupted by user.", file=sys.stderr) # User aborted.
sys.exit(130) # Conventional Ctrl+C code.
except Exception:
print("Error while calling the model:", file=sys.stderr) # Runtime failure.
import traceback # Lazy import for details.
traceback.print_exc() # Show stack trace.
sys.exit(1) # Exit on runtime error.
main() # Run immediately for terminal use.