OpenCV Optical Flow Algorithm for Object Tracking
One of the problems with the Meanshift Algorithm was that the region of interest always stayed at the same size when the object came closer to the camera or moved farther away. The region needs to adapt it's size with size and rotation of the target. This was remedied by the CAMShift Algorithm.
Optical flow is the pattern of apparent motion of image objects between two consecutive frames caused by the movement of object or camera. It is 2D vector field where each vector is a displacement vector showing the movement of points from first frame to second.
Optical Flow (Sparse)
Optical flow is the pattern of apparent motion of image objects between two consecutive frames caused by the movement of object or camera. It is 2D vector field where each vector is a displacement vector showing the movement of points from first frame to second.
Get your Video
Get your video file input:
cap = cv2.VideoCapture('resources/group_of_people_02.mp4')
# get first video frame
ok, frame = cap.read()
Auto Select Object to Track
I want to use the Shi-Tomasi Corner Detector which is used by the OpenCV Good Features to Track
function to detect the corner points of an object. As usual, image should be a grayscale image. Then you specify number of corners you want to find. Then you specify the quality level, which is a value between 0-1, which denotes the minimum quality of corner below which everyone is rejected. Then we provide the minimum euclidean distance between corners detected:
# generate initial corners of detected object
# set limit, minimum distance in pixels and quality of object corner to be tracked
parameters_shitomasi = dict(maxCorners=100, qualityLevel=0.3, minDistance=7)
With all this information, the function finds corners in the image. All corners below quality level are rejected. Then it sorts the remaining corners based on quality in the descending order. Then function takes first strongest corner, throws away all the nearby corners in the range of minimum distance and returns N strongest corners:
# convert to grayscale
frame_gray_init = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# Use Shi-Tomasi to detect object corners / edges from initial frame
edges = cv2.goodFeaturesToTrack(frame_gray_init, mask = None, **parameters_shitomasi)
Now that we have the edges
- corner points - of all detected objects in the initial frame wen can start comparing consecutive frames of the video to this initial frame and edges. But one more thing - we need to create a clean sheet to draw our detection lines on and a random colour generator for those lines:
# create a black canvas the size of the initial frame
canvas = np.zeros_like(frame)
# create random colours for visualization for all 100 max corners for RGB channels
colours = np.random.randint(0, 255, (100, 3))
And now we can loop through the following video frames, comparing each one to the initial detection. The while loop uses the cv2.calcOpticalFlowPyrLK function that needs to be configured with the following parameter:
# set min size of tracked object, e.g. 15x15px
parameter_lucas_kanade = dict(winSize=(15, 15), maxLevel=2, criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))
Parameters | Description |
---|---|
winSize | size of the search window at each pyramid level. |
maxLevel | 0-based maximal pyramid level number; if set to 0, pyramids are not used (single level), if set to 1, two levels are used, and so on; if pyramids are passed to input then algorithm will use as many levels as pyramids have but no more than maxLevel. |
criteria | parameter, specifying the termination criteria of the iterative search algorithm (after the specified maximum number of iterations criteria.maxCount or when the search window moves by less than criteria.epsilon. |
while True:
# get next frame
ok, frame = cap.read()
if not ok:
print("[INFO] end of file reached")
break
# prepare grayscale image
frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# update object corners by comparing with found edges in initial frame
update_edges, status, errors = cv2.calcOpticalFlowPyrLK(frame_gray_init, frame_gray, edges, None,
**parameter_lucas_kanade)
# only update edges if algorithm successfully tracked
new_edges = update_edges[status == 1]
# to calculate directional flow we need to compare with previous position
old_edges = edges[status == 1]
for i, (new, old) in enumerate(zip(new_edges, old_edges)):
a, b = new.ravel()
c, d = old.ravel()
# draw line between old and new corner point with random colour
mask = cv2.line(canvas, (int(a), int(b)), (int(c), int(d)), colours[i].tolist(), 2)
# draw circle around new position
frame = cv2.circle(frame, (int(a), int(b)), 5, colours[i].tolist(), -1)
result = cv2.add(frame, mask)
cv2.imshow('Optical Flow (sparse)', result)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
# overwrite initial frame with current before restarting the loop
frame_gray_init = frame_gray.copy()
# update to new edges before restarting the loop
edges = new_edges.reshape(-1, 1, 2)
The result looks like:
Manually Select Object to Track
The auto-selection works OK in the example above. But it can quickly become messy in crowded spaces. Here you might want to select the starting point of your track yourself. So let's create a function that is automatically called when a window with the name Optical Flow
is created and that listens to your left mouse-button. Whenever you click the frame displayed inside the window the coordinates of your cursor are recorded and the algorithm tries to track the underlying edge point of your selected object:
# define function to manually select object to track
def select_point(event, x, y, flags, params):
global point, selected_point, old_points
# record coordinates of mouse click
if event == cv2.EVENT_LBUTTONDOWN:
point = (x, y)
selected_point = True
old_points = np.array([[x, y]], dtype=np.float32)
# associate select function with window Selector
cv2.namedWindow('Optical Flow')
cv2.setMouseCallback('Optical Flow', select_point)
# initialize variables updated by function
selected_point = False
point = ()
old_points = ([[]])
And again we need to loop through the remaining frames applying the Sparse Optical Flow algorithm to track the object:
# loop through the remaining frames of the video
# and apply algorithm to track selected objects
while True:
# get next frame
frame = vs.read()
# covert to grayscale
frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
if selected_point is True:
cv2.circle(frame, point, 5, (0, 0, 255), 2)
# update object corners by comparing with found edges in initial frame
new_points, status, errors = cv2.calcOpticalFlowPyrLK(frame_gray_init, frame_gray, old_points, None,
**parameter_lucas_kanade)
# overwrite initial frame with current before restarting the loop
frame_gray_init = frame_gray.copy()
# update to new edges before restarting the loop
old_points = new_points
x, y = new_points.ravel()
j, k = old_points.ravel()
# draw line between old and new corner point with random colour
canvas = cv2.line(canvas, (int(x), int(y)), (int(j), int(k)), (0, 255, 0), 3)
# draw circle around new position
frame = cv2.circle(frame, (int(x), int(y)), 5, (0, 255, 0), -1)
result = cv2.add(frame, canvas)
cv2.imshow('Optical Flow', result)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
Optical Flow (Dense)
Lucas-Kanade method computes optical flow for a sparse feature set (in our example, corners detected using Shi-Tomasi algorithm). OpenCV provides another algorithm to find the dense optical flow. It computes the optical flow for all the points in the frame.
There is only one change outside of the while-loop compared to sparse flow - we need to define our canvas in the HSV colour space and initialize it with maximum saturation:
# create canvas to paint on
hsv_canvas = np.zeros_like(first_frame)
# set saturation value (position 2 in HSV space) to 255
hsv_canvas[..., 1] = 255
Again, we need to grab the following frames from our video and compare the current frame with the initial frame. The function in OpenCV is called cv2.calcOpticalFlowFarneback() and uses the following configuration parameters:
Parameters | Description |
---|---|
prev | first 8-bit single-channel input image. |
next | second input image of the same size and the same type as prev. |
flow | computed flow image that has the same size as prev and type CV_32FC2. |
pyr_scale | parameter, specifying the image scale (< 1) to build pyramids for each image; pyr_scale=0.5 means a classical pyramid, where each next layer is twice smaller than the previous one. |
levels | number of pyramid layers including the initial image; levels=1 means that no extra layers are created and only the original images are used. |
winsize | averaging window size; larger values increase the algorithm robustness to image noise and give more chances for fast motion detection, but yield more blurred motion field. |
iterations | number of iterations the algorithm does at each pyramid level. |
poly_n | size of the pixel neighborhood used to find polynomial expansion in each pixel; larger values mean that the image will be approximated with smoother surfaces, yielding more robust algorithm and more blurred motion field, typically poly_n =5 or 7. |
poly_sigma | standard deviation of the Gaussian that is used to smooth derivatives used as a basis for the polynomial expansion; for poly_n=5, you can set poly_sigma=1.1, for poly_n=7, a good value would be poly_sigma=1.5. |
while True:
# get next frame
ok, frame = cap.read()
if not ok:
print("[ERROR] reached end of file")
break
frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# compare initial frame with current frame
flow = cv2.calcOpticalFlowFarneback(frame_gray_init, frame_gray, None, 0.5, 3, 15, 3, 5, 1.1, 0)
# get x and y coordinates
magnitude, angle = cv2.cartToPolar(flow[..., 0], flow[..., 1])
# set hue of HSV canvas (position 1)
hsv_canvas[..., 0] = angle*(180/(np.pi/2))
# set pixel intensity value (position 3
hsv_canvas[..., 2] = cv2.normalize(magnitude, None, 0, 255, cv2.NORM_MINMAX)
frame_rgb = cv2.cvtColor(hsv_canvas, cv2.COLOR_HSV2BGR)
# optional recording result/mask
video_output.write(frame_rgb)
cv2.imshow('Optical Flow (dense)', frame_rgb)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
# set initial frame to current frame
frame_gray_init = frame_gray