import cv2 import datetime import io import numpy as np import os import threading import subprocess import multiprocessing import sys import tempfile from io import BytesIO from PIL import Image from ppadb.client import Client as AdbClient from dotenv import load_dotenv with_cuda = 0 if cv2.cuda.getCudaEnabledDeviceCount() > 0: print("CUDA is available") with_cuda = 1 else: print("CUDA is not available") with_cuda = 0 load_dotenv() ram_drive_path = os.getenv("RAMDRIVE_PATH") if ( not ram_drive_path or not os.path.exists(ram_drive_path) or not os.path.ismount(ram_drive_path) ): ram_drive_path = os.getenv("SCREENSHOT_PATH") print("no ram drive (fallback to {ram_drive_path})") android_address = os.getenv("ANDROID_ADDRESS") client = AdbClient(host="127.0.0.1", port=5037) if not android_address or not client: print(f"android address wrong? ({android_address})") sys.exit() def get_current_screen(): if not current_screen: print("something went wrong. not able to get screen.") sys.exit() return current_screen def screencap_worker(device, temp_file_name): try: screenshot = device.screencap() with open(temp_file_name, "wb") as f: f.write(screenshot) except Exception as e: print(f"Error in worker process: {e}") def capture_current_screen(timeout=10): # Create a temporary file temp_file = tempfile.NamedTemporaryFile(delete=False) temp_file_name = temp_file.name temp_file.close() capture_process = multiprocessing.Process( target=screencap_worker, args=(device, temp_file_name) ) capture_process.start() capture_process.join(timeout) if capture_process.is_alive(): capture_process.terminate() capture_process.join() print("Screen capture timed out") os.remove(temp_file_name) return None if not os.path.exists(temp_file_name) or os.path.getsize(temp_file_name) == 0: print("No data in the temporary file") os.remove(temp_file_name) return None # Read the screenshot from the temporary file global current_screen with open(temp_file_name, "rb") as f: current_screen = f.read() # Clean up os.remove(temp_file_name) return current_screen # def capture_current_screen(timeout=5): # Timeout in seconds # def target(): # global current_screen # current_screen = device.screencap() # capture_thread = threading.Thread(target=target) # capture_thread.start() # capture_thread.join(timeout) # if capture_thread.is_alive(): # print("Screen capture timed out") # # Handle the timeout situation, e.g., by retrying or aborting # capture_thread.join() # return current_screen def find_center(x1, y1, x2, y2): centerX = round(x1 + (x2 - x1) / 2) centerY = round(y1 + (y2 - y1) / 2) return centerX, centerY def call_device_shell(action, timeout=10): def target(): device.shell(action) thread = threading.Thread(target=target) thread.start() thread.join(timeout) if thread.is_alive(): print("ran into timeout") thread.join() def tap(x, y=None, text=None): # Check if x is an int if isinstance(x, int): if not isinstance(y, int): raise ValueError("y must be an int when x is an int") # Construct the location string from both x and y location = f"{x} {y}" # Check if x is a string elif isinstance(x, str): location = x elif isinstance(x, tuple): location = f"{x[0]} {x[1]}" else: raise TypeError("x must be either an int or a string") # Assuming 'device' is a previously defined object with a 'shell' method action = f"input tap {location}" print(f"{action} {text}") call_device_shell(action, timeout=5) def tap_button(template): button = find_template(template) if len(button) == 0: return tap(f"{button[0][0]} {button[0][1]}") def swipe(start, end, duration=1000): action = f"input swipe {start} {end} {duration}" print(action) call_device_shell(action, timeout=5) def look_for_templates(templates): for name, template in templates.items(): locations = find_template(template) if len(locations) > 0: return name, locations return None, None def first_template(template_image): result = find_template(template_image) if len(result) > 0: return result[0] return None def find_template(template_image): if with_cuda == 1: # Ensure the images are in the correct format (BGR for OpenCV) target_image = get_current_screen() # Upload images to GPU target_image_gpu = cv2.cuda_GpuMat() template_image_gpu = cv2.cuda_GpuMat() target_image_gpu.upload(target_image) template_image_gpu.upload(template_image) # Perform template matching on the GPU result_gpu = cv2.cuda.createTemplateMatching(cv2.CV_8UC3, cv2.TM_CCOEFF_NORMED) result = result_gpu.match(target_image_gpu, template_image_gpu) # Download result from GPU to CPU result = result.download() else: target_image = Image.open(BytesIO(get_current_screen())) # Convert the image to a NumPy array and then to BGR format (which OpenCV uses) target_image = np.array(target_image) target_image = cv2.cvtColor(target_image, cv2.COLOR_RGB2BGR) # target_image = target_image.astype(np.uint8) # template_image = template_image.astype(np.uint8) h, w = template_image.shape[:-1] # Template matching result = cv2.matchTemplate(target_image, template_image, cv2.TM_CCOEFF_NORMED) # Define a threshold threshold = 0.9 # Adjust this threshold based on your requirements # Finding all locations where match exceeds threshold locations = np.where(result >= threshold) locations = list(zip(*locations[::-1])) # Create list of rectangles rectangles = [(*loc, loc[0] + w, loc[1] + h) for loc in locations] # Apply non-maximum suppression to remove overlaps rectangles = non_max_suppression(rectangles, 0.3) # Initialize an empty list to store coordinates coordinates = [] for startX, startY, endX, endY in rectangles: # Append the coordinate pair to the list coordinates.append(find_center(startX, startY, endX, endY)) # Sort the coordinates by y value in ascending order return sorted(coordinates, key=lambda x: x[1]) def non_max_suppression(boxes, overlapThresh): if len(boxes) == 0: return [] # Convert to float boxes = np.array(boxes, dtype="float") # Initialize the list of picked indexes pick = [] # Grab the coordinates of the bounding boxes x1 = boxes[:, 0] y1 = boxes[:, 1] x2 = boxes[:, 2] y2 = boxes[:, 3] # Compute the area of the bounding boxes and sort by bottom-right y-coordinate area = (x2 - x1 + 1) * (y2 - y1 + 1) idxs = np.argsort(y2) # Keep looping while some indexes still remain in the indexes list while len(idxs) > 0: # Grab the last index in the indexes list and add the index value to the list of picked indexes last = len(idxs) - 1 i = idxs[last] pick.append(i) # Find the largest (x, y) coordinates for the start of the bounding box and the smallest (x, y) # coordinates for the end of the bounding box xx1 = np.maximum(x1[i], x1[idxs[:last]]) yy1 = np.maximum(y1[i], y1[idxs[:last]]) xx2 = np.minimum(x2[i], x2[idxs[:last]]) yy2 = np.minimum(y2[i], y2[idxs[:last]]) # Compute the width and height of the bounding box w = np.maximum(0, xx2 - xx1 + 1) h = np.maximum(0, yy2 - yy1 + 1) # Compute the ratio of overlap overlap = (w * h) / area[idxs[:last]] # Delete all indexes from the index list that have overlap greater than the threshold idxs = np.delete( idxs, np.concatenate(([last], np.where(overlap > overlapThresh)[0])) ) # Return only the bounding boxes that were picked return boxes[pick].astype("int") def save_screenshot(path="test"): # Take a screenshot result = capture_current_screen() timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") image = Image.open(io.BytesIO(result)) jpeg_filename = f"{path}/{timestamp}.jpg" image = image.convert("RGB") # Convert to RGB mode for JPEG with open(jpeg_filename, "wb") as fp: image.save(fp, format="JPEG", quality=85) # Adjust quality as needed print(f"snap: {jpeg_filename}") def save_screenshot2(path="test"): proc = subprocess.Popen( "adb exec-out screencap -p", shell=True, stdout=subprocess.PIPE ) image_bytes = proc.stdout.read() image = cv2.imdecode(np.frombuffer(image_bytes, np.uint8), cv2.IMREAD_COLOR) image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") jpeg_filename = f"{path}/{timestamp}.jpg" cv2.imwrite( jpeg_filename, cv2.cvtColor(image, cv2.COLOR_RGB2BGR), [int(cv2.IMWRITE_JPEG_QUALITY), 85], ) print(f"snap: {jpeg_filename}") device = client.device(android_address) current_screen = capture_current_screen()