import argparse import multiprocessing as mp import os import threading as th import time from datetime import datetime from time import perf_counter os.environ['DISPLAY'] = ':0' import cv2 inputQueue = mp.Queue(100) vehicleDetectionQueue = mp.Queue(100) outputQueue = mp.Queue(100) IMAGE_HEIGHT = mp.Value('i',0) IMAGE_WIDTH = mp.Value('i',0) class ReadFrame(th.Thread): global inputQueue global IMAGE_HEIGHT,IMAGE_WIDTH def __init__(self,source,name='Input thread',custom_id=1) -> None: super().__init__() self.frameId = 1 self.stopped = False self.grabbed = True self.name = f'{name} {custom_id}' self.videoCaptureObject = cv2.VideoCapture(source) if(self.videoCaptureObject.isOpened()): # set self.grabbed here. IMAGE_HEIGHT.value = int(self.videoCaptureObject.get(cv2.CAP_PROP_FRAME_HEIGHT)) IMAGE_WIDTH.value = int(self.videoCaptureObject.get(cv2.CAP_PROP_FRAME_WIDTH)) print(f'Reading from source = {source}') def run(self): initial = perf_counter() while (self.grabbed): (self.grabbed, self.frame) = self.videoCaptureObject.read() #IMAGE_HEIGHT.value = int(self.videoCaptureObject.get(cv2.CAP_PROP_FRAME_HEIGHT)) #IMAGE_WIDTH.value = int(self.videoCaptureObject.get(cv2.CAP_PROP_FRAME_WIDTH)) inputQueue.put((self.frame,self.frameId)) print(f"{self.name} frame added with id {self.frameId}\n") self.frameId+=1 print('--Done reading frames--\n') self.videoCaptureObject.release() end = perf_counter() print(f'Total time taken by {self.name} = {end - initial}\n') return #Create seperate classes for threads and processes and pass global variables as arguments to __init__() class VehicleDetection(mp.Process): global inputQueue global vehicleDetectionQueue def __init__(self,name='Vehicle Detection Process',custom_id=1): super(VehicleDetection,self).__init__() self.name = f'{name} {custom_id}' def run(self): while (True): #Add a end token for vehicleDetection if(inputQueue.qsize() == 0): vehicleDetectionQueue.put(None) print(f'{self.name} exiting !! \n') end = perf_counter() print(f'Total time taken by {self.name} = {end - initial}\n') return (frame,frameId) = inputQueue.get() print(f"{self.name} Got frame with ID {frameId} qsize = {inputQueue.qsize()}\n") #do some processing here. #time.sleep(.5) vehicleDetectionQueue.put((frame,frameId)) class NumberPlateOcr(mp.Process): global vehicleDetectionQueue global outputQueue def __init__(self,name='Number plate OCR Process',custom_id=1): super(NumberPlateOcr,self).__init__() self.name=f'{name} {custom_id}' def run(self): initial = perf_counter() while True: value = vehicleDetectionQueue.get() #Change the value of end token for consumer/producer relationship. if(value == None): print(f'{self.name} exiting !! \n') outputQueue.put(None) end = perf_counter() print(f'Total time taken by {self.name} = {end - initial}\n') return (frame,frameId) = value print(f"{self.name} Got frame with ID {frameId}\n") #do some processing here. #time.sleep(.25) outputQueue.put((frame,frameId)) class OutputFrame(th.Thread): global IMAGE_HEIGHT,IMAGE_WIDTH global outputQueue def __init__(self,name='output thread',custom_id=1,outputfilename="output.avi"): super().__init__() self.name = f'{name} {custom_id}' self.outputfilename = outputfilename print(f'frame size {IMAGE_HEIGHT.value} {IMAGE_WIDTH.value}') self.videoWriterObject = cv2.VideoWriter(outputfilename,cv2.VideoWriter_fourcc(*'MJPG'),30,(IMAGE_WIDTH.value,IMAGE_HEIGHT.value)) def run(self): initial = perf_counter() while True: try: value = outputQueue.get() #change the end token if(value == None): end = perf_counter() print(f'Total time taken by {self.name} = {end - initial}\n') return (frame,frameId) = value print(f'{self.name} got frame with ID {frameId} shape = {frame.shape}') self.videoWriterObject.write(frame) except(AttributeError): continue if __name__ == '__main__': import cProfile app_profiler = cProfile.Profile() parser = argparse.ArgumentParser(description='BitSilica Traffic Analysis Solution') parser.add_argument('--image', help=' Full Path to image file.') parser.add_argument('--video', help='Full Path to video file.') parser.add_argument('--realtime',help='Camera Connected Input') args = parser.parse_args() #Name of the video file. outputvideo = f'output {os.path.basename(args.video)[:-4]} {datetime.now()}.avi' print(f'-----> Writing to file {outputvideo} <-------\n') #enable profiler here after the debug is set to true. initial = perf_counter() app_profiler.enable() readFramesThread = ReadFrame(args.video) vehicleDetectionProcess = VehicleDetection() numberPlateOcrProcess = NumberPlateOcr() outputframeThread = OutputFrame(outputfilename = outputvideo) readFramesThread.start() # add a condition for checking the input token for starting. time.sleep(.25) vehicleDetectionProcess.start() numberPlateOcrProcess.start() outputframeThread.start() print(f'{vehicleDetectionProcess.name} {vehicleDetectionProcess.pid} \n') print(f'{numberPlateOcrProcess.name} {numberPlateOcrProcess.pid} \n') readFramesThread.join() print(f'readframesthread {readFramesThread.is_alive()}\n') vehicleDetectionProcess.join() print(f'vehicleDetectionProcess {vehicleDetectionProcess.is_alive()}\n') numberPlateOcrProcess.join() print(f'numberPlateOcrProcess {numberPlateOcrProcess.is_alive()}\n') outputframeThread.join() print(f'{outputframeThread.name} {outputframeThread.is_alive()}') #disable profiler here on debug set to true. app_profiler.disable() end = perf_counter() print(f'Total time taken by __main__ = {end - initial}\n') profile_name = str('{}.prof'.format(os.path.basename(args.video)[0:-4])) print("------------------------\nEnd of execution, dumping profile stats\n-------------------------") app_profiler.dump_stats(profile_name)