我们这一部分模拟一个低能的阶乘计算器,来比较Service和Action的差异。
碎碎念:Hello米娜桑,这里是英国留学中的杨丝儿。我的博客技术点集中在机器人、人工智能可解释性、数学、物理等等,感兴趣地点个关注吧,持续高质量输出中。
说明以下问题
创建一个名为FactorialService.srv
,定义输入和输出
uint32 num_in
---
uint32 num_out
取消package.xml
中的注释
添加信息到CMakeLists.txt
find_package(catkin REQUIRED COMPONENTS
rospy
std_msgs # 新增
message_generation # 新增
)
# Generate services in the 'srv' folder
add_service_files(
FILES
FactorialService.srv # 新增
)
取消CMakeLists.txt
中的注释
使用$catkin_make
编译
客户端factorial_service_client.py
代码
#!/usr/bin/env python3
import rospy
from factorial_calculator.srv import FactorialService
class FactorialServiceClient():
def __init__(self):
rospy.init_node('service_client_factorial')
rospy.wait_for_service('service/server/factorial/service')
self.factorise = rospy.ServiceProxy('service/server/factorial/service', FactorialService)
def call(self,num):
return self.factorise(num).num_out
if __name__ == '__main__':
client = FactorialServiceClient()
while input_num := input("Input a num: "):
if not input_num.isdigit():
continue
input_num = int(input_num)
result = client.call(input_num)
print(f"Result is: {result}")
服务端factorial_service_server.py
代码
#!/usr/bin/env python3
import rospy
from factorial_calculator.srv import FactorialService,FactorialServiceResponse
class FactorialServiceServer():
def __init__(self):
rospy.init_node('service_server_factorial')
def factorise(self,num):
result = 1
for i in range(1,num+1):
result *= i
rospy.sleep(0.2)
print(result)
return result
def run(self):
service = rospy.Service('service/server/factorial/service', FactorialService, lambda x : FactorialServiceResponse(self.factorise(x.num_in)))
rospy.spin()
if __name__ == '__main__':
server = FactorialServiceServer()
server.run()
创建一个名为FactorialAction.action
uint32 num_in
---
uint32 num_process
---
uint32 num_out
在package.xml
添加信息
<build_depend>actionlib_msgs</build_depend>
<exec_depend>actionlib_msgs</exec_depend>
CMakeLists.txt
# Generate actions in the 'action' folder
add_action_files(
FILES
FactorialAction.action # 新增
)
添加
actionlib_msgs
依赖 find_package(catkin REQUIRED COMPONENTS rospy std_msgs actionlib_msgs # 新增 message_generation ) generate_messages( DEPENDENCIES std_msgs # Or other packages containing msgs actionlib_msgs # 新增 ) catkin_package( # INCLUDE_DIRS include # LIBRARIES factorial_calculator # CATKIN_DEPENDS rospy # DEPENDS system_lib actionlib_msgs # 新增 )
$catkin_make
客户端factorial_action_client.py
代码
#!/usr/bin/env python3
import rospy
import actionlib
from factorial_calculator.msg import FactorialActionAction,FactorialActionGoal,FactorialActionResult
class FactorialActionClient():
def __init__(self):
# 初始化节点
rospy.init_node('action_client_factorial')
# 定义动作客户端
self.client = actionlib.SimpleActionClient('action/server/factorial/action',
FactorialActionAction)
self.client.wait_for_server()
def feedback_callback(self,feedback):
if int(feedback)>100000:
self.client.cancel_goal()
print('Feedback:', feedback)
def call(self,num):
# 定义goal
goal = FactorialActionGoal()
goal.num_goal = num
# 发送goal
self.client.send_goal(goal, feedback_cb=lambda x : self.feedback_callback(x.num_feedback))
# 阻塞等待结果,一般是不会这样使用的
# 这是多线程中的阻塞步骤
self.client.wait_for_result()
return self.client.get_result().num_result
if __name__ == '__main__':
client = FactorialActionClient()
# 循环读取终端输入并上传动作服务器执行。
while input_num := input("Input a num: "):
if not input_num.isdigit():
continue
input_num = int(input_num)
result = client.call(input_num)
print(f"Result is: {result}")
服务端factorial_action_server.py
代码
#!/usr/bin/env python3
import rospy
import actionlib
from factorial_calculator.msg import FactorialActionAction,FactorialActionGoal,FactorialActionResult,FactorialActionFeedback
class FactorialActionServer():
def __init__(self):
rospy.init_node('action_server_factorial')
self.server = actionlib.SimpleActionServer('action/server/factorial/action',
FactorialActionAction,
lambda x : self.factorise(x.num_goal),
auto_start=False)
def factorise(self,num):
result = 1
if num > 14:
self.server.set_aborted(FactorialActionResult(result),'[aborted]')
return
for i in range(1,num+1):
if self.server.is_preempt_requested():
self.server.set_preempted(FactorialActionResult(result),'[preempted]')
result *= i
rospy.sleep(0.2)
self.server.publish_feedback(FactorialActionFeedback(result))
self.server.set_succeeded(FactorialActionResult(result))
return
def run(self):
self.server.start()
rospy.spin()
if __name__ == '__main__':
server = FactorialActionServer()
server.run()