3 次代碼提交 a399fb6854 ... f922a1659e

作者 SHA1 備註 提交日期
  Richard Köhl f922a1659e introduce multiprocessing with temporary storing on ramdisk 1 年之前
  Richard Köhl 60e567502e go to next loop as soon as possible 1 年之前
  Richard Köhl 526616d350 fix environment path 1 年之前
共有 3 個文件被更改,包括 62 次插入16 次删除
  1. 5 2
      dungeon.py
  2. 56 13
      helper.py
  3. 1 1
      screenshot.py

+ 5 - 2
dungeon.py

@@ -101,12 +101,15 @@ while True:
                 tap(*locations[0])
                 if name in ["auto_battle", "ok"]:
                     time.sleep(1.5)
+                    continue
             if name == "attack":
                 if len(locations) == 1:
                     element = first_template(tpl_mixed)
                     is_mixed = element is not None
                     tap(*locations[0])
-                elif len(locations) == 2:
+                    continue
+
+                if len(locations) == 2:
                     element = first_template(tpl_mixed)
                     if element:
                         is_mixed = True
@@ -117,7 +120,7 @@ while True:
                             element = first_template(tpl_earth)
 
                     tap(element[0], locations[0][1])
-
+                    continue
         print("...")
     except Exception as e:
         print(f"error: {e}")

+ 56 - 13
helper.py

@@ -5,6 +5,9 @@ import numpy as np
 import os
 import threading
 import subprocess
+import multiprocessing
+import sys
+import tempfile
 from io import BytesIO
 from PIL import Image
 from ppadb.client import Client as AdbClient
@@ -19,27 +22,68 @@ else:
     with_cuda = 0
 
 load_dotenv()
+ram_drive_path = os.getenv("RAMDRIVE_PATH")
+if (
+    not ram_drive_path
+    or not os.path.exists(ram_drive_path)
+    or not os.path.ismount(ram_drive_path)
+):
+    print("no ram drive ({ram_drive_path})")
+
 android_address = os.getenv("ANDROID_ADDRESS")
+client = AdbClient(host="127.0.0.1", port=5037)
+if not android_address or not client:
+    print(f"android address wrong? ({android_address})")
+    sys.exit()
 
 
 def get_current_screen():
+    if not current_screen:
+        print("something went wrong. not able to get screen.")
+        sys.exit()
     return current_screen
 
 
-def capture_current_screen(timeout=5):  # Timeout in seconds
-    def target():
-        global current_screen
-        current_screen = device.screencap()
+def screencap_worker(device, temp_file_name):
+    try:
+        screenshot = device.screencap()
+        with open(temp_file_name, "wb") as f:
+            f.write(screenshot)
+    except Exception as e:
+        print(f"Error in worker process: {e}")
 
-    capture_thread = threading.Thread(target=target)
-    capture_thread.start()
-    capture_thread.join(timeout)
 
-    if capture_thread.is_alive():
+def capture_current_screen(timeout=10):
+    # Create a temporary file
+    temp_file = tempfile.NamedTemporaryFile(delete=False)
+    temp_file_name = temp_file.name
+    temp_file.close()
+
+    capture_process = multiprocessing.Process(
+        target=screencap_worker, args=(device, temp_file_name)
+    )
+    capture_process.start()
+    capture_process.join(timeout)
+
+    if capture_process.is_alive():
+        capture_process.terminate()
+        capture_process.join()
         print("Screen capture timed out")
-        # Handle the timeout situation, e.g., by retrying or aborting
-        capture_thread.join()
+        os.remove(temp_file_name)
+        return None
+
+    if not os.path.exists(temp_file_name) or os.path.getsize(temp_file_name) == 0:
+        print("No data in the temporary file")
+        os.remove(temp_file_name)
+        return None
 
+    # Read the screenshot from the temporary file
+    global current_screen
+    with open(temp_file_name, "rb") as f:
+        current_screen = f.read()
+
+    # Clean up
+    os.remove(temp_file_name)
     return current_screen
 
 
@@ -115,7 +159,7 @@ def first_template(template_image):
 def find_template(template_image):
     if with_cuda == 1:
         # Ensure the images are in the correct format (BGR for OpenCV)
-        target_image = capture_current_screen()
+        target_image = get_current_screen()
 
         # Upload images to GPU
         target_image_gpu = cv2.cuda_GpuMat()
@@ -218,7 +262,7 @@ def non_max_suppression(boxes, overlapThresh):
 
 def save_screenshot(path="test"):
     # Take a screenshot
-    result = capture_current_screen()
+    result = get_current_screen()
 
     timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
     image = Image.open(io.BytesIO(result))
@@ -249,7 +293,6 @@ def save_screenshot2(path="test"):
     print(f"snap: {jpeg_filename}")
 
 
-client = AdbClient(host="127.0.0.1", port=5037)
 device = client.device(android_address)
 
 current_screen = capture_current_screen()

+ 1 - 1
screenshot.py

@@ -10,7 +10,7 @@ from helper import (
     save_screenshot,
 )
 
-screenshot_address = os.getenv("ANDROID_ADDRESS")
+screenshot_address = os.getenv("SCREENSHOT_PATH")
 
 # templates
 close_sub_fights = cv2.imread("templates/close_sub_fights.jpg")