from packages.dummyLibraries.dummies import board import packages.dummyLibraries.neopixel as neopixel from packages.dummyLibraries.dummies import busio import packages.dummyLibraries.adafruit_pca9685 as adafruit_pca9685 from packages.dummyLibraries.dummies import BytesIO from packages.dummyLibraries.dummies import PiCamera # import board # import neopixel # import busio # import adafruit_pca9685 # from picamera import PiCamera # from io import BytesIO import os import time from flask import Flask, request, jsonify, make_response, send_file from PIL import Image, ImageDraw import numpy as np import time import math from flask import Flask from flask_apscheduler import APScheduler import multiprocessing class Camera: # the squares on the image taken which contain one piece of the cube pixel_groups = [ (120, 0, 185, 34), (341, 0, 430, 15), (568, 0, 627, 34), (71, 140, 160, 300), (240, 140, 250, 300), (624, 140, 700, 300), (130, 413, 180, 472), (333, 422, 424, 472), (573, 417, 622, 476), ] stream = None camera = None def __init__(self): # setup picamera self.stream = BytesIO() self.camera = PiCamera() # fix camera params for consistent images self.camera.iso = 100 self.camera.shutter_speed = 9252 # self.camera.exposure_mode = 'off' self.camera.awb_mode = 'off' self.camera.awb_gains = (397 / 256, 297 / 256) def capture(self): # TODO can't figure out an easier workaround for this # save the image as capture1, rename all previous images to capture n+1 # delete any images above 6 as we shouldn't need them # useful for debugging stuff. print('e') dirs = sorted(os.listdir('piDummyDirectory/captures')) for i in range(min(len(dirs) - 1, 4), -1, -1): os.rename(os.path.join('piDummyDirectory/captures', dirs[i]), os.path.join('piDummyDirectory/captures', 'capture' + str(i + 2) + '.jpg')) self.camera.capture('captures/capture1.jpg') image = Image.open('piDummyDirectory/captures/capture1.jpg') face_colours = [] draw = ImageDraw.Draw(image) for i, face in enumerate(self.pixel_groups): # crop the image based on the pixels provided by face, # then find the mean of all the above colours face_colours.append(np.mean(list(image.crop(face).getdata()), axis=0).tolist()) colour = ( int(face_colours[-1][0]), int(face_colours[-1][1]), int(face_colours[-1][2]), ) draw.rectangle(face, outline=colour, width=10) draw.rectangle(face, outline=(0,0,0), width=2) draw.rectangle((430, 140+i*20, 500, 160+i*20), fill=colour, outline=(0, 0, 0), width=2) draw.text((face[0], face[1]), str(i)) image.save('captures/capture1.jpg', "jpeg") return face_colours app = Flask(__name__) i2c = busio.I2C(board.SCL, board.SDA) pca = adafruit_pca9685.PCA9685(i2c) pca.frequency = 50 # servo_channel = pca.channels[0] pixels = neopixel.NeoPixel(board.D18, 8) camera = Camera() # data variables info capturedColours = [[] for _ in range(6)] # store 6 of the previous camera captures received_data = {} # Store received data in a dictionary def set_servos(data): global pca index = int(data.get("index")) value = float(data.get("value")) if value == -180: duty = 0 else: duty = int(1500 + (value / 180) * 6500) pca.channels[index].duty_cycle = duty def set_leds(data): global pixels index = int(data.get("index")) value = eval(data.get("value")) if index == -1: pixels.fill(value) else: pixels[index] = value def cam_capture(_data): capturedColours.append(camera.capture()) capturedColours.pop(0) @app.route('/receiveDirect', methods=['POST']) def receive_direct(): data = request.form.to_dict() component = data.get("component") if component == "servo": set_servos(data) elif component == "led": set_leds(data) elif component == "camera": cam_capture(data) print(data) return "Data received successfully" @app.route('/receiveList', methods=['POST']) def receive_list(): face_colours = [] data_list = request.get_json() elapsed_time = 0 print(data_list) # a bit of a strange loop structure, I know. # loops through the list and executes each item once enough time has passed. # assumes that the received list is sorted in order of the time each move is executed. data_list_index = 0 data = data_list[data_list_index] while data_list_index < len(data_list) - 1: if data_list[data_list_index].get('time') == elapsed_time: component = data.get("component") if component == "servo": set_servos(data) elif component == "led": set_leds(data) elif component == "camera": cam_capture(data) data_list_index += 1 data = data_list[data_list_index] else: time.sleep(data_list[data_list_index].get('time') - elapsed_time) elapsed_time = data_list[data_list_index].get('time') print(capturedColours) return "Data received successfully" @app.route('/get_data', methods=['GET']) def get_data(): data = request.form.to_dict() data_request = data.get("request") if data_request == "image": response = make_response(jsonify(capturedColours), 200) response.mimetype = "text/plain" return response @app.route('/camera', methods=['GET']) def get_camera(): return jsonify(camera.capture()) if __name__ == '__main__': scheduler = APScheduler() scheduler.init_app(app) scheduler.start() INTERVAL_TASK_ID = 'interval-task-id' scheduler.add_job(id=INTERVAL_TASK_ID, func=interval_task, trigger='interval', seconds=.3) app.run(host='0.0.0.0', port=5000) print("aaa")