|
- import os
- import cv2
- import argparse
- import time
- import random
- import multiprocessing as mp
- import threading as th
- from queue import Queue
-
- inputQueue = mp.Queue()
- vehicleDetectionQueue = mp.Queue()
- numberPlateOcrQueue = mp.Queue()
- displayQueue = mp.Queue()
- outputQueue = mp.Queue()
- class ReadFrame(th.Thread):
- def __init__(self,source) -> None:
- super().__init__()
- self.frameId = 1
- self.stopped = False
- self.grabbed = True
-
- self.videoCaptureObject = cv2.VideoCapture(source)
- print(f'Reading from source = {source}')
-
- global inputQueue
-
- def run(self):
- while self.grabbed:
- (self.grabbed, self.frame) = self.videoCaptureObject.read()
- inputQueue.put((self.frame,self.frameId))
- print(f"IP frame added with id {self.frameId}\n")
- self.frameId+=1
-
-
- class VehicleDetection(mp.Process):
- global inputQueue
- def run(self):
- while True:
- (frame,frameId) = inputQueue.get()
- #inputQueue.task_done()
- print(f"\n VD Got frame with ID {frameId}")
- #do some processing here.
- vehicleDetectionQueue.put((frame,frameId))
-
- class NumberPlateOcr(mp.Process):
- global inputQueue
- global numberPlateOcrQueue
- def run(self):
- while True:
- (frame,frameId) = vehicleDetectionQueue.get()
- #inputQueue.task_done()
- print(f"\n NP Got frame with ID {frameId}")
- #do some processing here.
- numberPlateOcrQueue.put((frame,frameId))
-
- class DisplayFrame(mp.Process):
- global numberPlateOcrQueue
- global displayQueue
- def run(self):
- while True:
- (frame,frameId) = numberPlateOcrQueue.get()
- print(f'DF got frame with ID {frameId}')
- #display image here with frame ID
- outputQueue.put((frame,frameId))
-
- class SaveOutput(mp.Process):
- global displayQueue
- #Takes all the (frame,frameId) and then sorts them and merges them in a video using some tool.
-
-
-
-
- if __name__ == '__main__':
- import cProfile
-
- app_profiler = cProfile.Profile()
-
- parser = argparse.ArgumentParser(description='BitSilica Traffic Analysis Solution')
- parser.add_argument('--image', help=' Full Path to image file.')
- parser.add_argument('--video', help='Full Path to video file.')
- parser.add_argument('--realtime',help='Camera Connected Input')
-
- args = parser.parse_args()
- #enable profiler here.
- app_profiler.enable()
-
- readFramesThread = ReadFrame(args.video)
- vehicleDetectionProcess = VehicleDetection()
- numberPlateOcrProcess = NumberPlateOcr()
- readFramesThread.start()
- vehicleDetectionProcess.start()
- numberPlateOcrProcess.start()
-
-
- #disable profiler here.
- app_profiler.disable()
-
- profile_name = str('temp.prof'.format(os.path.basename(args.video)[0:-4]))
- print("------------------------\nEnd of execution, dumping profile stats\n-------------------------")
- app_profiler.dump_stats(profile_name)
|