To work with gRPC in Python, you need to install the following packages:
sudo python3 -m pip install grpcio
python3 -m pip install grpcio-tools
Then you can automatically generate Python code with that command:
pythonpython3 -m grpc_tools.protoc -I./protos --python_out=. --pyi_out=. --grpc_python_out=. ./protos/prc.proto
This gets you the following three files:
prc_pb2_grpc.py
prc_pb2.py
prc_pb2.pyi
Along in the folder is the prc_localhost.pem certificate for the SSL encryption. It is included in the example project.
Import the following libraries for the sample
pythonimport logging import os import time import threading import grpc import prc_pb2 import prc_pb2_grpc
Next, set up the path to the *.pem certificate and establish the GRPC channel
pythonprint("Getting certificate from same folder as the python code...") __location__ = os.path.realpath( os.path.join(os.getcwd(), os.path.dirname(__file__))) with open(os.path.join(__location__, 'prc_localhost.pem'), 'rb') as f: credentials = grpc.ssl_channel_credentials(f.read()) print("Connecting to https://localhost:5001...") with grpc.secure_channel("localhost:5001", credentials) as channel: stub = prc_pb2_grpc.ParametricRobotControlServiceStub(channel)
Ping the PRC Server and choose a unique ID for your robot
pythonresponse = stub.SendPing(prc_pb2.Ping(payload="", time_ms=10)) robot_id = "PRC_Test"
You can then setup your robotic system. The robot driver class and preset robot class reference existing drivers/robots in PRC.
pythonsetup_robot_reply = stub.SetupRobot( prc_pb2.SetupRobotRequest( client_id=robot_id, software_version="0.1", robot_setup=prc_pb2.Robot( friendly_id="KUKA KR10", robot_driver_class="KUKA.KSS_KRL_Driver", preset_robot_class="KUKA.KUKA_KR610R11002", initial_base=prc_pb2.Base( base_id="0", base_frame=prc_pb2.CartesianPosition( cs=prc_pb2.CoordinateSystem( origin=prc_pb2.Vector3( x=0, y=0, z=0 ), x_axis=prc_pb2.Vector3( x=1, y=0, z=0 ), y_axis=prc_pb2.Vector3( x=0, y=1, z=0 ) ) ) ), tool_dictionary={ "0": prc_pb2.Tool( tool_id="0", tool_type=prc_pb2.FrameType.FIXED, tcp=prc_pb2.CartesianPosition( cs=prc_pb2.CoordinateSystem( origin=prc_pb2.Vector3( x=0, y=0, z=0 ), x_axis=prc_pb2.Vector3( x=1, y=0, z=0 ), y_axis=prc_pb2.Vector3( x=0, y=1, z=0 ) ) ) ) } ) ) )
In order to get feedback, we establish the feedback stream and spin up a thread for it.
pythonfeedback_stream = stub.SubscribeRobotFeedback( prc_pb2.SubscribeRobotFeedbackRequest( id=robot_id, ) ) print("Starting separate thread to manage the feedback stream coming from PRC") stop_event = threading.Event() feedback_thread = threading.Thread(target=thread_feedback, args=(stop_event, feedback_stream,)) feedback_thread.start()
The relevant thread looks like this.
pythondef thread_feedback(stop_event, feedback_stream): for feedback in feedback_stream: if stop_event.is_set(): break assert isinstance(feedback, prc_pb2.RobotFeedback) field = feedback.WhichOneof('data_package') if field == "heartbeat_data": print("Feedback thread: Received heartbeat data") elif field == "robot_state_data": print("Feedback thread: Received robot state data") elif field == "settings_data": print("Feedback thread: Received settings data") elif field == "ping_data": print("Feedback thread: Received ping data")
You can now define your movements and group them in
MotionGroup
objects.pythonptp_motion_1 = prc_pb2.MotionCommand( axis_motion=prc_pb2.AxisMotion( target=prc_pb2.JointTarget( axis_values=[-45, -90, 90, 0, 0, 0], speed=[0.1] ) ) ) ptp_motion_2 = prc_pb2.MotionCommand( axis_motion=prc_pb2.AxisMotion( target=prc_pb2.JointTarget( axis_values=[45, -90, 90, 0, 0, 0], speed=[0.15] ) ) ) ptp_motion_group = prc_pb2.MotionGroup( commands=[ptp_motion_1, ptp_motion_2], interpolation="C_PTP", motion_group_type=prc_pb2.MotionGroupType.PTP )
Next, put the motion groups into a task. You get the default
robot_settings
from the initial SetupRobot
call. As they are a dictionary, you can edit them easily.pythontask_reply = stub.AddRobotTask( prc_pb2.AddRobotTaskRequest( id=robot_id, robot_task=prc_pb2.Task( name="Task", type=prc_pb2.TaskType.SIMULATE_AND_EXECUTE_TASK, payload=[prc_pb2.TaskPayload( motion_group_task=ptp_motion_group )] ), robot_settings=prc_pb2.Settings( settings_dictionary=setup_robot_reply.robot_settings.settings_dictionary ) ) )
Among the output of the
task_reply
is e.g. the code
object, containing the code to run on the robot.You can go through the simulation through the
GetStimulatedRobotState
call. In the example below, it iterates by 4% every 400ms.pythoni=0 while i < 100: i+=4 robot_state = stub.GetSimulatedRobotState( prc_pb2.GetSimulatedRobotStateRequest( stream_update=False, id=robot_id, normalized_state=i/100.0 ) ) axis_values = robot_state.actual_axis_position.axis_values print("At factor " + str(i/100.0) + " the robot is at A1: " + str(axis_values[0]) + " A2:" + str(axis_values[1]) + " A3:" + str(axis_values[2]) + " A4:" + str(axis_values[3]) + " A5:" + str(axis_values[4]) + " A6:" + str(axis_values[5]) ) time.sleep(0.4)
The full code is available here:
PRC.Integrations
jbraumann • Updated Apr 29, 2025