Difference between revisions of "2021FallTeam8"
Line 129: | Line 129: | ||
[https://drive.google.com/file/d/ | [https://drive.google.com/file/d/1r2b6Swi83I8lkU5pRIl6pWr_IVKMZiAg/view?usp=sharing Final Presentation] | ||
==Source Code== | ==Source Code== |
Revision as of 00:18, 11 December 2021
Team Members
- Colin Riccio - Mechanical Engineering
- Brian Pack - Mechanical Engineering
- Carolin Lehmacher - Electrical Engineering
Robot
Robocar - Project
The picture on the right shows our Robocar and part of the wiring for our used project.
Mechanical Design
Camera Mount
The mount system was 3d printed and had 3 parts: a camera backplate, a stand and a lidar mount. The Lidar mount ties into the front standoffs and the stand attaches to the electronics plate.
The camera mount has a pivot joint to allow for repositioning the camera angle. This proved to be initially helpful to set an optimal angle, but we ran into issues where if the mount was bumped it would lead to failures with our DonkeyCar models. It might be a good idea to first have a variable angle mount, find the best setting for the camera, and then create a static mount to use for training.
Electronic Plate The plate was designed with a full array of mounting holes (with 1cm spacing) to facilitate the attachment of parts anywhere on the plate. It was laser cut from 1/8th inch acrylic.
Electrical Design
Components
- Jetson Nano 2GB
- Battery
- USB Camera
- Servo
- Xerun 360
- Anti spark switch
- 12V to 5V Converter
- Flipsky mini
Wiring
The following picture shows the wiring of the robocar used for the project.
EMO Demonstration
Autonomous Laps
Warren Tent, 3 laps with DonkeyCar
Circuit Tent, 3 laps with Donkey Car
Final Project
Project Overview
Our goal for the final project was to build an image recognition node onto the ucsd_robocar2 that can detect a speed sign placed on the track and dynamically change the speed of the car.
Electronic Changes
To control the speed of our car, we had to switch out the majority of our electronics and incorporate a VESC motor controller. The VESC can read the sensor in our brushless motor and use that information to set the RPM of the motor, meaning that instead of using PWM to control the motor we can now just send RPM values, directly changing the speed of the car. Because we switched to the VESC, we had to change from the ucsd_robo_car_simple_ros to the ucsd_robocar2 docker package as the ucsd_robo_car_simple_ros system cannot make use of the VESC.
Image Recognition
To recognize different speed signs, we chose to use an Image Classification Neural Network. We decided to use image processing with OpenCV to filter the camera images delivered by the ros camera_node so that only the speed sign is visible in the hope it would make the Classification model stronger. We then used the ros calibration_node to find the best values for our filter, and then collected data for our 3 speed sign classes (~100 images of each sign for training and ~30 for validation) and one null class (~350 training images and ~90 for validation). To actually collect the data, a python script was written to capture an image from the camera, filter that image, and save it in the data folder structure used by the training pipeline we used (Data_Collection).
Example of the webcam image after filtering
After collecting the data, we used a training pipeline built by Team 1 for their final project which created a model built off of transfer learning from ResNet18 (Training_Pipeline).
We created a python script to test the model by taking the webcam image, filtering it, passing that image to the model, and printing out which sign it sees (Model_Live_Tester).
Video of the image recognition
Creating a ROS Node
We then created a new ros speed_reader_node which will subscribe to the camera_node to receive images from the webcam, and will publish a speed value to the lane_guidence_node (Speed_Sign_Node). First we build the node to get a speed value from the user and publish that value, which would then change the speed of the car (Speed_Node). After confirming publishing worked, we added a subscription to the camera_node to get the image and applied OpenCV filters to that image. That filtered image is then passed to the image recognition model which classifies which sign the camera currently sees. The speed associated with that sign is then published to the lane_guidence_node, which in turn changes the speed of the car.
Testing changing the speed of the car with Speed_Node
Obstacles
There were several major roadblocks that we dealt with in our project:
- Changing our electronics took a significant amount of time as we had to solder a bunch of components, remove the old parts, redo wiring and install the new parts
- The ROS2 docker package had several major bugs as it is still in beta
- Lighting conditions in the tents meant it was difficult to use the line following package any time before the sun set
- We were unable to get Pytorch(the package which runs the image classification model) loaded into the Docker container, which meant we were unable to actually get the final speed_reader_node to run
Future Improvements
If given more time to work on the project, we would first want to get Pytorch loaded into the Docker container so that we can use the lane_guidence_node with the speed_reader_node running, recognizing signs and changing the speed. Then, we would like to improve the model by adding more speed signs and collecting more data to do more training. We would also like to experiment with the filter we used to try and make it less noisy.
Presentations
Weekly presentation including project proposal and gantt chart
Source Code
Modified Lane_Guidence_node:
import rclpy from rclpy.node import Node from std_msgs.msg import Float32, Int32, Int32MultiArray from geometry_msgs.msg import Twist import time import os NODE_NAME = 'lane_guidance_node' CENTROID_TOPIC_NAME = '/centroid' ACTUATOR_TOPIC_NAME = '/cmd_vel' SIGN_TOPIC_NAME = '/speed' class PathPlanner(Node): def __init__(self): super().__init__(NODE_NAME) self.twist_publisher = self.create_publisher(Twist, ACTUATOR_TOPIC_NAME, 10) self.twist_cmd = Twist() self.centroid_subscriber = self.create_subscription(Float32, CENTROID_TOPIC_NAME, self.controller, 10) self.sign_subscriber = self.create_subscription(Float32, SIGN_TOPIC_NAME, self.sign_check, 10) self.sign_subscriber self.centroid_subscriber # Default actuator values self.declare_parameters( namespace='', parameters=[ ('steering_sensitivity', 1), ('no_error_throttle', 1), ('error_throttle', 1), ('error_threshold', 1), ('zero_throttle',1) ]) self.steering_sensitivity = self.get_parameter('steering_sensitivity').value self.no_error_throttle = self.get_parameter('no_error_throttle').value self.error_throttle = self.get_parameter('error_throttle').value self.error_threshold = self.get_parameter('error_threshold').value self.zero_throttle = self.get_parameter('zero_throttle').value self.get_logger().info( f'\nsteering_sensitivity: {self.steering_sensitivity}' f'\nno_error_throttle: {self.no_error_throttle}' f'\nerror_throttle: {self.error_throttle}' f'\nerror_threshold: {self.error_threshold}' f'\nzero_throttle: {self.zero_throttle}' ) # Default speed multiplier self.speed_mult = 1.0 # Get speed multipier data from speed topic def sign_check(self, data): self.speed_mult = data.data print('speed set \n') def controller(self, data): try: kp = self.steering_sensitivity sm = self.speed_mult error_x = data.data self.get_logger().info(f"{error_x}") if error_x <= self.error_threshold: throttle_float = float(self.no_error_throttle) else: throttle_float = float(self.error_throttle) steering_float = float(kp * error_x) #Apply multiplier if steering_float < -1.0: steering_float = -1.0 elif steering_float > 1.0: steering_float = 1.0 else: pass self.twist_cmd.angular.z = steering_float self.twist_cmd.linear.x = throttle_float*sm self.twist_publisher.publish(self.twist_cmd) except KeyboardInterrupt: self.twist_cmd.linear.x = self.zero_throttle self.twist_publisher.publish(self.twist_cmd) # def call_pub(self, twist_msg): # self.twist_publisher.publish(self.twist_cmd) def main(args=None): rclpy.init(args=args) path_planner_publisher = PathPlanner() try: rclpy.spin(path_planner_publisher) path_planner_publisher.destroy_node() rclpy.shutdown() except KeyboardInterrupt: path_planner_publisher.get_logger().info(f'Shutting down {NODE_NAME}...') path_planner_publisher.twist_cmd.linear.x = path_planner_publisher.zero_throttle path_planner_publisher.twist_publisher.publish(path_planner_publisher.twist_cmd) time.sleep(1) path_planner_publisher.destroy_node() rclpy.shutdown() path_planner_publisher.get_logger().info(f'{NODE_NAME} shut down successfully.') if __name__ == '__main__': main()
Speed_Reader_node:
import rclpy from rclpy.node import Node from sensor_msgs.msg import Image from std_msgs.msg import Float32, Int32, Int32MultiArray import cv2 from cv_bridge import CvBridge import numpy as np import os import torch from torchvision import transforms import time from PIL import Image as image_pil NODE_NAME = 'sign_reader_node' SIGN_TOPIC_NAME = '/speed' # Topics subcribed/published to in this program CAMERA_TOPIC_NAME = '/camera/color/image_raw' class SignReader(Node): def __init__(self): super().__init__(NODE_NAME) self.speed_publisher = self.create_publisher(Float32, SIGN_TOPIC_NAME, 10) self.speed_publisher self.camera_subscriber = self.create_subscription(Image, CAMERA_TOPIC_NAME, self.read, 10) self.camera_subscriber self.bridge = CvBridge() self.speed_num = Float32() self.speed_num.data = 0 print('speed topic started \n') self.frequency = 1 self.timer_period = 1 / self.frequency #self.timer = self.create_timer(self.timer_period, self.read) # Create the timer # self.current_speed=None #self.sign_detected = False #image manipulation values self.Hue_low = 84 self.Hue_high = 179 self.Saturation_low = 54 self.Saturation_high = 173 self.Value_low = 34 self.Value_high = 148 self.gray_lower = 32 self.inverted_filter = 0 self.start_height = 200 self.bottom_height = 450 self.left_width = 0 self.right_width = 800 self.model_loc = 'model_1.pb' self.class_list = ['no_sign', 'sign_1', 'sign_2', 'sign_3'] self.threshold = 0.99 self.class_prediction = "no_sign" #getting the model ready self.model = torch.load(self.model_loc) self.data_transform = transforms.Compose([ transforms.Resize(256), transforms.CenterCrop(224), transforms.ToTensor(), transforms.Normalize([0.485,0.456,0.406],[.229,.224,.225]) ]) def read(self, data): frame = self.bridge.imgmsg_to_cv2(data) self.image_width = int(self.right_width - self.left_width) img = frame[self.start_height:self.bottom_height, self.left_width:self.right_width] # changing color space to HSV hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV) # setting threshold limits for white color filter lower = np.array([self.Hue_low, self.Saturation_low, self.Value_low]) upper = np.array([self.Hue_high, self.Saturation_high, self.Value_high]) mask = cv2.inRange(hsv, lower, upper) if self.inverted_filter == 1: bitwise_mask = cv2.bitwise_and(img, img, mask=cv2.bitwise_not(mask)) else: bitwise_mask = cv2.bitwise_and(img, img, mask=mask) # changing to gray color space gray = cv2.cvtColor(bitwise_mask, cv2.COLOR_BGR2GRAY) # changing to black and white color space gray_upper = 255 (dummy, blackAndWhiteImage) = cv2.threshold(gray, self.gray_lower, gray_upper, cv2.THRESH_BINARY) #running the model on the image #converting black and white back to rgb rgb = cv2.cvtColor(blackAndWhiteImage, cv2.COLOR_GRAY2RGB) #converting image from nparray to regular image frame_altered = image_pil.fromarray(rgb) # Performing data transformations to tensor temp_tensor = self.data_transform(frame_altered) # altering to fit the input of the prediction pipeline (annoying artifict since it expects a batch of data # rather than a single datapoint) temp_tensor = temp_tensor.unsqueeze(0) # Making the prediction test_predicts = self.model(temp_tensor) # Finding the index of the highest prediction _, pred = torch.max(test_predicts,1) # if that prediction is above a threshold, then change the current prediction if test_predicts[0][int(pred[0])] >= self.threshold: self.sign_prediction = self.class_list[int(pred[0])] if self.sign_prediction != self.class_list[0]: print(self.sign_prediction+"detected with "+test_predicts[0][int(pred[0])]+" confidence") self.speed_num.data = float(pred[0])/2 self.speed_publisher.publish(self.speed_num) #this might break ros, not sure time.sleep(2) else: print("no sign detected") def main(args=None): rclpy.init(args=args) sign_reader_publisher = () try: rclpy.spin(sign_reader_publisher) sign_reader_publisher.destroy_node() rclpy.shutdown() except KeyboardInterrupt: sign_reader_publisher.get_logger().info(f'Shutting down {NODE_NAME}...') sign_reader_publisher.destroy_node() rclpy.shutdown() sign_reader_publisher.get_logger().info(f'{NODE_NAME} shut down successfully.') if __name__ == '__main__': main()
Data_Collection:
import cv2 import numpy as np from datetime import datetime import sys # set image manipulation values based on calibration node lowH = 84 highH = 179 lowS = 54 highS = 173 lowV = 34 highV = 148 gray_lower = 32 inverted = 0 start_height = 200 bottom_height = 450 left_width = 0 right_width = 800 counter = 0 while(True): user_input = input("Press <Enter> key to take a photo. Press <Ctrl> + <c> to exit") if user_input == "": # Image processing from slider values cap = cv2.VideoCapture(0) ret, frame = cap.read() img = frame[start_height:bottom_height, left_width:right_width] # changing color space to HSV hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV) lower = np.array([lowH, lowS, lowV]) higher = np.array([highH, highS, highV]) mask = cv2.inRange(hsv, lower, higher) if inverted == 1: bitwise_mask = cv2.bitwise_and(img, img, mask=cv2.bitwise_not(mask)) else: bitwise_mask = cv2.bitwise_and(img, img, mask=mask) # changing to gray color space gray = cv2.cvtColor(bitwise_mask, cv2.COLOR_BGR2GRAY) # changing to black and white color space gray_upper = 255 (dummy, blackAndWhiteImage) = cv2.threshold(gray, gray_lower, gray_upper, cv2.THRESH_BINARY) #'~/data/sign_1/train' #'~/data/sign_1/val' #'~/data/sign_2/train' #'~/data/sign_2/val' #'~/data/sign_3/train' #'~/data/sign_3/val' if ret: cur_time = datetime.now().strftime("%H%M%S") capture = cv2.imwrite(sys.argv[1]+cur_time+".jpg", blackAndWhiteImage) counter += 1 print("Image written to: " + sys.argv[1],capture) print("You now have ",counter, " images in "+sys.argv[1]) else: print("an error") cv2.waitKey(1) cap.release() cap.release() cv2.destroyAllWindows()
Model_Live_Tester:
import numpy as np import cv2 import torch from torchvision import transforms import time from PIL import Image as image_pil lowH = 84 highH = 179 lowS = 54 highS = 173 lowV = 34 highV = 148 gray_lower = 32 inverted = 0 start_height = 200 bottom_height = 450 left_width = 0 right_width = 800 class_list = ['no_sign', 'sign_1', 'sign_2', 'sign_3'] threshold = 0.70 class_prediction = "None" def run(): cap = cv2.VideoCapture(0) if not cap.isOpened(): print("Cannot open camera") exit() while True: ret, frame = cap.read() img = frame[start_height:bottom_height, left_width:right_width] # changing color space to HSV hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV) lower = np.array([lowH, lowS, lowV]) higher = np.array([highH, highS, highV]) mask = cv2.inRange(hsv, lower, higher) if inverted == 1: bitwise_mask = cv2.bitwise_and(img, img, mask=cv2.bitwise_not(mask)) else: bitwise_mask = cv2.bitwise_and(img, img, mask=mask) # changing to gray color space gray = cv2.cvtColor(bitwise_mask, cv2.COLOR_BGR2GRAY) # changing to black and white color space gray_upper = 255 (dummy, blackAndWhiteImage) = cv2.threshold(gray, gray_lower, gray_upper, cv2.THRESH_BINARY) # Display the resulting frame #point to the model location model_loc = 'C:/Users/Robert Pack/Downloads/model_1.pb' #getting the model ready device = torch.device('cpu') model = torch.load(model_loc, device) data_transform = transforms.Compose([ transforms.Resize(256), transforms.CenterCrop(224), transforms.ToTensor(), transforms.Normalize([0.485,0.456,0.406],[.229,.224,.225]) ]) rgb = cv2.cvtColor(blackAndWhiteImage, cv2.COLOR_GRAY2RGB) frame_altered = image_pil.fromarray(rgb) # Performing data transformations to tensor temp_tensor = data_transform(frame_altered) # altering to fit the input of the prediction pipeline (annoying artifict since it expects a batch of data # rather than a single datapoint) temp_tensor = temp_tensor.unsqueeze(0) # Making the prediction test_predicts = model(temp_tensor) # Finding the index of the highest prediction _, pred = torch.max(test_predicts,1) prob = round(float(test_predicts[0][int(pred[0])]),2) #print(test_predicts) if prob >= threshold: class_prediction = class_list[int(pred[0])] print(class_prediction+" detected with ",prob," confidence") else: print("no class above threshold") cv2.imshow('frame',blackAndWhiteImage) if cv2.waitKey(1) == ord('q'): break cap.release() cv2.destroyAllWindows() # When everything done, release the capture if __name__ == '__main__': run()
Training_Pipeline: We got this code from Jacob Glenn Ayers and Team 1
# Import statements from __future__ import print_function, division import torch import torch.nn as nn import torch.optim as optim from torch.optim import lr_scheduler import numpy as np import torchvision from torchvision import datasets, models, transforms import matplotlib.pyplot as plt import time import os import copy from datetime import datetime # Loading in the datasets # Data augmentation and normalization for training # Just normalization for validation data_transforms = { 'train': transforms.Compose([ transforms.RandomResizedCrop(224), transforms.RandomHorizontalFlip(), transforms.ToTensor(), transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) ]), 'val': transforms.Compose([ transforms.Resize(256), transforms.CenterCrop(224), transforms.ToTensor(), transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) ]), } # Change this based on the data being worked on data_dir = '../test_images' image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x), data_transforms[x]) for x in ['train', 'val']} dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=4, shuffle=True, num_workers=4) for x in ['train', 'val']} dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val']} class_names = image_datasets['train'].classes device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") inputs, classes = next(iter(dataloaders['train'])) out = torchvision.utils.make_grid(inputs) # function to train the model using transfer learning of resnet18 def train_model(model, criterion, optimizer, scheduler, num_epochs=25): since = time.time() best_model_wts = copy.deepcopy(model.state_dict()) best_acc = 0.0 for epoch in range(num_epochs): print('Epoch {}/{}'.format(epoch, num_epochs - 1)) print('-' * 10) # Each epoch has a training and validation phase for phase in ['train', 'val']: if phase == 'train': model.train() # Set model to training mode else: model.eval() # Set model to evaluate mode running_loss = 0.0 running_corrects = 0 # Iterate over data. for inputs, labels in dataloaders[phase]: inputs = inputs.to(device) labels = labels.to(device) # zero the parameter gradients optimizer.zero_grad() # forward # track history if only in train with torch.set_grad_enabled(phase == 'train'): outputs = model(inputs) _, preds = torch.max(outputs, 1) loss = criterion(outputs, labels) # backward + optimize only if in training phase if phase == 'train': loss.backward() optimizer.step() # statistics running_loss += loss.item() * inputs.size(0) running_corrects += torch.sum(preds == labels.data) if phase == 'train': scheduler.step() epoch_loss = running_loss / dataset_sizes[phase] epoch_acc = running_corrects.double() / dataset_sizes[phase] print('{} Loss: {:.4f} Acc: {:.4f}'.format( phase, epoch_loss, epoch_acc)) # deep copy the model if phase == 'val' and epoch_acc > best_acc: best_acc = epoch_acc best_model_wts = copy.deepcopy(model.state_dict()) print() time_elapsed = time.time() - since print('Training complete in {:.0f}m {:.0f}s'.format( time_elapsed // 60, time_elapsed % 60)) print('Best val Acc: {:4f}'.format(best_acc)) # load best model weights model.load_state_dict(best_model_wts) return model # Performing the Training model_ft = models.resnet18(pretrained=True) num_ftrs = model_ft.fc.in_features # Here the size of each output sample is set to 2. # Alternatively, it can be generalized to nn.Linear(num_ftrs, len(class_names)). # This would need to be changed to 5 print("Relevant Classes -\n") print(class_names) model_ft.fc = nn.Linear(num_ftrs, len(class_names)) model_ft = model_ft.to(device) criterion = nn.CrossEntropyLoss() # Observe that all parameters are being optimized optimizer_ft = optim.SGD(model_ft.parameters(), lr=0.001, momentum=0.9) # Decay LR by a factor of 0.1 every 7 epochs exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1) model_ft = train_model(model_ft, criterion, optimizer_ft, exp_lr_scheduler, num_epochs=10) # Saving the model cur_time = datetime.now().strftime("%H%M%S") torch.save(model_ft,"model_"+cur_time+".pb")
Speed_Node: Node we used to test publishing to the lane_guidence_node to change speed
import rclpy from rclpy.node import Node from std_msgs.msg import Float32, Int32, Int32MultiArray import cv2 from cv_bridge import CvBridge import time import os NODE_NAME = 'speed_node' SIGN_TOPIC_NAME = '/speed' class SignReader(Node): def __init__(self): super().__init__(NODE_NAME) self.speed_publisher = self.create_publisher(Float32, SIGN_TOPIC_NAME, 10) self.speed_publisher self.speed_num = Float32() self.speed_num.data = 1.0 print('speed topic started \n') self.frequency = 1 self.timer_period = 1 / self.frequency self.timer = self.create_timer(self.timer_period, self.read) # Create the timer # self.current_speed=None #self.sign_detected = False def read(self): print('trying input \n') try: num_in = float(input("Set Speed: ")) if num_in < 0.0: num_in = 0.0 elif num_in > 2.0: num_in = 2.0 else: pass self.speed_num.data = num_in self.speed_publisher.publish(self.speed_num) except KeyboardInterrupt: self.speed_num.data = 1.0 self.speed_publisher.publish(self.speed_num) def main(args=None): rclpy.init(args=args) sign_reader_publisher = SignReader() try: rclpy.spin(sign_reader_publisher) sign_reader_publisher.destroy_node() rclpy.shutdown() except KeyboardInterrupt: sign_reader_publisher.get_logger().info(f'Shutting down {NODE_NAME}...') time.sleep(1) sign_reader_publisher.destroy_node() rclpy.shutdown() sign_reader_publisher.get_logger().info(f'{NODE_NAME} shut down successfully.') if __name__ == '__main__': main()