File size: 4,522 Bytes
4a6a19d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
#!/usr/bin/env python3
#
# SPDX-FileCopyrightText: Hadad <[email protected]>
# SPDX-License-Identifier: Apache-2.0
#
# UltimaX Intelligence CLI
# 0.0.1 version
#

import sys
import argparse

try:
    import pollinations  # Provider.
except ImportError:
    print("Module 'pollinations' not found. Install it with: pip install pollinations pollinations.ai", file=sys.stderr)
    sys.exit(2)  # Exit to signal missing dependency.

def response(output):  # Print non-stream responses.
    try:
        if hasattr(output, "text"):  # Handle objects with .text.
            print(output.text)  # Print text and return.
            return
        if isinstance(output, dict) and "text" in output:  # Handle dicts with 'text'.
            print(output["text"])  # Print dict value and return.
            return
        if isinstance(output, (list, tuple)):  # Handle sequences of parts.
            try:
                print("".join(map(str, output)))  # Join parts and return.
                return
            except Exception:
                pass  # Fall through to fallback.
    except Exception:
        pass  # Fall through on any inspection error.
    print(str(output))  # Fallback printing.

def main():  # Script entry point.
    parser = argparse.ArgumentParser(description="UltimaX Intelligence")
    parser.add_argument("text", nargs="*", help="Helper for the user input.")  # Positional text.
    parser.add_argument("--no-stream", dest="stream", action="store_false", help="Use non-streaming mode.")
    args = parser.parse_args()  # Parse flags and positionals.

    if not args.text:  # No positional text given.
        if sys.stdin.isatty():  # Nothing piped in.
            print("Empty input. Request cancelled.", file=sys.stderr)  # Immediate cancel on empty invocation.
            sys.exit(1)  # Exit to indicate no work done.
        input = sys.stdin.read()  # Read piped stdin.
    elif len(args.text) == 1 and args.text[0] == "-":  # Explicit stdin sentinel.
        if sys.stdin.isatty():  # No data to read.
            print("Empty input. Request cancelled.", file=sys.stderr)  # Cancel when '-' without pipe.
            sys.exit(1)  # Exit to indicate no work done.
        input = sys.stdin.read()  # Read piped stdin.
    else:
        input = " ".join(args.text)  # Join positional pieces.

    # Reject empty or whitespace-only input.
    if not input or input.strip() == "":  # Guard against blanks.
        print("Empty input. Request cancelled.", file=sys.stderr)  # Consistent message.
        sys.exit(1)  # Exit to indicate no work done.

    # Build model instance.
    try:
        model = pollinations.Text()  # OpenAI: GPT-4.1 (Nano).
    except Exception:
        print("Failed to construct default model.", file=sys.stderr)  # High level failure.
        import traceback  # Lazy import for error details.
        traceback.print_exc()  # Show stack trace for debugging.
        sys.exit(1)  # Exit due to setup failure.

    try:
        if args.stream:  # Streaming path.
            for token in model(input, stream=True):  # Iterate tokens.
                try:
                    if isinstance(token, (str, bytes)):  # Strings or bytes.
                        print(token.decode() if isinstance(token, bytes) else token, end="", flush=True)  # Emit token.
                    elif isinstance(token, dict) and "text" in token:  # Dict tokens.
                        print(token["text"], end="", flush=True)  # Emit dict text.
                    elif hasattr(token, "text"):  # Object tokens with .text.
                        print(token.text, end="", flush=True)  # Emit attribute.
                    else:
                        print(str(token), end="", flush=True)  # Emit generic.
                except Exception:
                    print(str(token), end="", flush=True)  # Fallback on token error.
            print()  # Newline after stream completes.
        else:  # Non-streaming path.
            output = model(input, stream=False)  # Get full response.
            response(output)  # Print.
    except KeyboardInterrupt:
        print("\nInterrupted by user.", file=sys.stderr)  # User aborted.
        sys.exit(130)  # Conventional Ctrl+C code.
    except Exception:
        print("Error while calling the model:", file=sys.stderr)  # Runtime failure.
        import traceback  # Lazy import for details.
        traceback.print_exc()  # Show stack trace.
        sys.exit(1)  # Exit on runtime error.

main()  # Run immediately for terminal use.