Yaron Koresh commited on
Commit
7aa37aa
·
verified ·
1 Parent(s): 86141f3

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +18 -124
app.py CHANGED
@@ -16,6 +16,7 @@ import threading
16
  import asyncio
17
  from queue import Queue as BlockingQueue
18
  from functools import partial
 
19
 
20
  # external
21
 
@@ -129,134 +130,27 @@ pipe.scheduler = DDIMScheduler(
129
  pipe.load_ip_adapter("h94/IP-Adapter", subfolder="models", weight_name="ip-adapter-plus_sd15.bin")
130
  pipe.enable_free_init(method="butterworth", use_fast_sampling=fast)
131
 
132
- # Threading
133
-
134
- class TwoSidedQueue:
135
- def __init__(self, queue_in, queue_out):
136
- self._queue_in = queue_in
137
- self._queue_out = queue_out
138
- self._sides = {
139
- 'empty': queue_out,
140
- 'full': queue_out,
141
- 'get': queue_in,
142
- 'get_nowait': queue_in,
143
- 'join': queue_out,
144
- 'put': queue_out,
145
- 'put_nowait': queue_out,
146
- 'qsize': queue_out,
147
- 'task_done': queue_in,
148
- }
149
- def __getattr__(self, name):
150
- return getattr(self._sides.get(name, self._queue_in), name)
151
-
152
-
153
- class LaunchAsync:
154
- def __init__(self, coro, *args, **kwargs):
155
- self._coro = coro
156
- self._args = args
157
- self._kwargs = kwargs
158
- self._thread = None
159
- self._loop = None
160
- self._task = None
161
- self._queue_in = None
162
- self._queue_out = None
163
- self._size = 0
164
-
165
- def size(self, size):
166
- self._size = size or 0
167
- return self
168
-
169
- def put(self, data, *, timeout=None):
170
- """
171
- `put` data in for the `coro` to `get` out. Will block if the maximum `size` was reached.
172
-
173
- Does nothing if the `coro` is dead.
174
- """
175
- try:
176
- return asyncio.run_coroutine_threadsafe(self._queue_out.put(data), self._loop).result(timeout)
177
- except RuntimeError:
178
- if self._loop.is_running():
179
- raise
180
- else:
181
- return None
182
-
183
- def get(self, *, timeout=None):
184
- """
185
- `get` data out of the `coro` it `put` in. Will block if the queue is empty.
186
-
187
- Returns `None` if the `coro` is dead.
188
- """
189
- try:
190
- return asyncio.run_coroutine_threadsafe(self._queue_in.get(), self._loop).result(timeout)
191
- except RuntimeError:
192
- if self._loop.is_running():
193
- raise
194
- else:
195
- return None
196
-
197
- def dead(self):
198
- """
199
- Return `true` if the other side is dead (the `coro` has exited, with or without error).
200
- """
201
- return not self._loop.is_running()
202
-
203
- def __enter__(self):
204
- # asyncio.run is used as it's a battle-tested way to safely set up a new loop and tear
205
- # it down. However it does mean it's necessary to wait for the task to run before it's
206
- # possible to get said loop and task back. For this, the usual blocking queue is used.
207
- oneshot = BlockingQueue(1)
208
- self._thread = threading.Thread(target=asyncio.run, args=(
209
- self._run(self._coro, self._size, oneshot, self._args, self._kwargs),))
210
- self._thread.start()
211
- self._loop, self._task, self._queue_in, self._queue_out = oneshot.get()
212
- return self
213
-
214
- def __exit__(self, exc_type, exc_value, exc_traceback):
215
- try:
216
- self._loop.call_soon_threadsafe(self._task.cancel)
217
- except RuntimeError:
218
- if self._loop.is_running():
219
- raise
220
- finally:
221
- self._thread.join()
222
-
223
- @staticmethod
224
- async def _run(coro, size, oneshot, args, kwargs):
225
- # asyncio.Queue's are created here so that they pick up the right loop.
226
- queue_in, queue_out = asyncio.Queue(size), asyncio.Queue(size)
227
- oneshot.put((asyncio.get_event_loop(), asyncio.current_task(), queue_in, queue_out))
228
- try:
229
- # `queue_in` and `queue_out` are intentionally swapped here.
230
- await coro(TwoSidedQueue(queue_out, queue_in), *args, **kwargs)
231
- except asyncio.CancelledError:
232
- pass
233
-
234
- class Command:
235
- def __init__(self, func, data=None):
236
- self.func = func
237
- self.data = data
238
-
239
- def parallel(timeout,*pairs):
240
  if len(pairs) == 0:
241
  return
242
  if len(pairs) == 1:
243
  pairs = pairs[0]
 
244
  out = []
245
-
246
- async def async_main(queue):
247
- while True:
248
- command = await queue.get()
249
- await queue.put(command.func(*command.data))
250
- out.append(queue.get(timeout=timeout))
251
-
252
- with LaunchAsync(async_main) as queue:
253
- for pair in pairs:
254
- f = pair.pop(0)
255
- queue.put(Command(f, pair))
256
- while len(out) < len(pairs):
257
- time.sleep(5)
258
- return out
259
-
260
  # functionality
261
 
262
  def run(cmd):
@@ -357,7 +251,7 @@ def handle(*inp):
357
  ln = len(result)
358
 
359
  parargs = [[calc,*inp] for i in range(ln)]
360
- out_pipe = parallel(5,parargs)
361
  names = []
362
  for i in out_pipe:
363
  name = generate_random_string(12)+".png"
 
16
  import asyncio
17
  from queue import Queue as BlockingQueue
18
  from functools import partial
19
+ from multiprocessing import Process
20
 
21
  # external
22
 
 
130
  pipe.load_ip_adapter("h94/IP-Adapter", subfolder="models", weight_name="ip-adapter-plus_sd15.bin")
131
  pipe.enable_free_init(method="butterworth", use_fast_sampling=fast)
132
 
133
+ # Parallelism
134
+
135
+ def parallel(*pairs):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
136
  if len(pairs) == 0:
137
  return
138
  if len(pairs) == 1:
139
  pairs = pairs[0]
140
+
141
  out = []
142
+ running_tasks = []
143
+
144
+ for pair in pairs:
145
+ f = pair.pop(0)
146
+ running_tasks.append(Process(target=task(*pair)))
147
+ for running_task in running_tasks:
148
+ running_task.start()
149
+ for running_task in running_tasks:
150
+ running_task.join()
151
+ out.append(running_task.get())
152
+ return out
153
+
 
 
 
154
  # functionality
155
 
156
  def run(cmd):
 
251
  ln = len(result)
252
 
253
  parargs = [[calc,*inp] for i in range(ln)]
254
+ out_pipe = parallel(parargs)
255
  names = []
256
  for i in out_pipe:
257
  name = generate_random_string(12)+".png"