Why gRPC Over REST?
REST with JSON is the default for public APIs. For internal microservices, gRPC often wins on all three dimensions that matter most: performance (Protobuf binary is 3–10x smaller than JSON), type safety (schema-first with generated stubs in every language), and streaming (bidirectional streams are first-class, not bolted on).
gRPC uses HTTP/2 under the hood — multiplexed streams, header compression, and persistent connections all come for free. When your order service calls inventory 10,000 times per second, those gains matter.
You can validate Protobuf-to-JSON conversions using the DevKits JSON Formatter, and use the Base64 Encoder/Decoder to inspect binary Protobuf payloads.
Installation
pip install grpcio grpcio-tools
# For async support (recommended for FastAPI integration)
pip install grpcio grpcio-tools grpcio-reflection
Define Your Protobuf Schema
Everything starts with a .proto file. This is your source of truth — both the server and every client generate code from the same schema.
// user_service.proto
syntax = "proto3";
package users;
// The service definition
service UserService {
rpc GetUser (GetUserRequest) returns (User);
rpc CreateUser (CreateUserRequest) returns (User);
rpc ListUsers (ListUsersRequest) returns (stream User);
rpc BulkCreateUsers (stream CreateUserRequest) returns (BulkCreateResponse);
}
message GetUserRequest {
string user_id = 1;
}
message CreateUserRequest {
string name = 1;
string email = 2;
int32 age = 3;
}
message User {
string id = 1;
string name = 2;
string email = 3;
int32 age = 4;
int64 created_at = 5; // Unix timestamp
}
message ListUsersRequest {
int32 page_size = 1;
string page_token = 2;
}
message BulkCreateResponse {
int32 created_count = 1;
repeated string user_ids = 2;
}
Generate Python Stubs
# Generates user_service_pb2.py (messages) and user_service_pb2_grpc.py (stubs)
python -m grpc_tools.protoc \
-I. \
--python_out=. \
--grpc_python_out=. \
user_service.proto
Add this to a Makefile so your CI always regenerates stubs from the .proto file — never commit manually edited generated files.
Implement the Server
import grpc
import time
import uuid
from concurrent import futures
import user_service_pb2 as pb2
import user_service_pb2_grpc as pb2_grpc
# In-memory store for this example
USERS: dict[str, pb2.User] = {}
class UserServiceServicer(pb2_grpc.UserServiceServicer):
def GetUser(self, request, context):
user = USERS.get(request.user_id)
if not user:
context.set_code(grpc.StatusCode.NOT_FOUND)
context.set_details(f"User {request.user_id!r} not found")
return pb2.User()
return user
def CreateUser(self, request, context):
# Validation
if not request.email or "@" not in request.email:
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
context.set_details("Invalid email address")
return pb2.User()
user = pb2.User(
id=str(uuid.uuid4()),
name=request.name,
email=request.email,
age=request.age,
created_at=int(time.time()),
)
USERS[user.id] = user
return user
def ListUsers(self, request, context):
"""Server-side streaming: yields users one by one"""
page_size = request.page_size or 10
for i, user in enumerate(USERS.values()):
if i >= page_size:
break
yield user # Each yield sends one message to the client
def BulkCreateUsers(self, request_iterator, context):
"""Client-side streaming: reads all requests, returns one response"""
created = []
for req in request_iterator:
user = pb2.User(
id=str(uuid.uuid4()),
name=req.name,
email=req.email,
age=req.age,
created_at=int(time.time()),
)
USERS[user.id] = user
created.append(user.id)
return pb2.BulkCreateResponse(
created_count=len(created),
user_ids=created,
)
def serve(port: int = 50051):
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=10),
options=[
("grpc.max_send_message_length", 10 * 1024 * 1024), # 10 MB
("grpc.max_receive_message_length", 10 * 1024 * 1024),
],
)
pb2_grpc.add_UserServiceServicer_to_server(UserServiceServicer(), server)
server.add_insecure_port(f"[::]:{port}")
server.start()
print(f"gRPC server running on port {port}")
server.wait_for_termination()
if __name__ == "__main__":
serve()
Implement the Client
import grpc
import user_service_pb2 as pb2
import user_service_pb2_grpc as pb2_grpc
def get_channel(host: str = "localhost", port: int = 50051):
# Use grpc.secure_channel() in production with TLS
return grpc.insecure_channel(
f"{host}:{port}",
options=[("grpc.enable_retries", 1)],
)
def demo():
with get_channel() as channel:
stub = pb2_grpc.UserServiceStub(channel)
# Unary RPC
user = stub.CreateUser(pb2.CreateUserRequest(
name="Alice",
email="[email protected]",
age=30,
))
print(f"Created: {user.id} — {user.name}")
# Unary RPC with error handling
try:
stub.GetUser(pb2.GetUserRequest(user_id="nonexistent"))
except grpc.RpcError as e:
print(f"Error [{e.code()}]: {e.details()}")
# Server-side streaming
print("\\nAll users:")
for u in stub.ListUsers(pb2.ListUsersRequest(page_size=100)):
print(f" - {u.name} ({u.email})")
# Client-side streaming
def user_generator():
for name, email in [("Bob", "[email protected]"), ("Carol", "[email protected]")]:
yield pb2.CreateUserRequest(name=name, email=email, age=25)
result = stub.BulkCreateUsers(user_generator())
print(f"\\nBulk created {result.created_count} users")
if __name__ == "__main__":
demo()
Async gRPC with asyncio
For high-throughput services, use the async gRPC API to avoid blocking threads:
import grpc.aio
import asyncio
async def async_demo():
async with grpc.aio.insecure_channel("localhost:50051") as channel:
stub = pb2_grpc.UserServiceStub(channel)
user = await stub.CreateUser(pb2.CreateUserRequest(
name="Dave",
email="[email protected]",
age=28,
))
print(f"Async created: {user.name}")
# Async server streaming
async for u in stub.ListUsers(pb2.ListUsersRequest(page_size=50)):
print(f" {u.name}")
asyncio.run(async_demo())
Interceptors for Cross-Cutting Concerns
gRPC interceptors are the equivalent of HTTP middleware — perfect for auth, logging, and metrics:
class AuthInterceptor(grpc.ServerInterceptor):
def intercept_service(self, continuation, handler_call_details):
metadata = dict(handler_call_details.invocation_metadata)
token = metadata.get("authorization", "")
if not token.startswith("Bearer "):
def abort(request, context):
context.set_code(grpc.StatusCode.UNAUTHENTICATED)
context.set_details("Missing or invalid token")
return pb2.User()
return grpc.unary_unary_rpc_method_handler(abort)
return continuation(handler_call_details)
# Add interceptor to server
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=10),
interceptors=[AuthInterceptor()],
)
gRPC vs REST: When to Choose What
| Factor | gRPC | REST/JSON |
|---|---|---|
| Internal microservices | Excellent | Good |
| Public API / browser clients | Needs gRPC-Web proxy | Excellent |
| Real-time streaming | Native | SSE / WebSocket workaround |
| Schema enforcement | Strict (Protobuf) | Optional (OpenAPI) |
| Human readability | Binary (needs tooling) | Human-readable JSON |
| Payload size | 3–10x smaller | Larger (verbose keys) |
Common Pitfalls
Forgetting to regenerate stubs after schema changes
If you change a field number in the .proto file, old clients will silently misread the data. Never change existing field numbers — only add new ones. Wire compatibility is the key constraint of Protobuf evolution.
Blocking in async servicers
If you use grpc.aio on the server but call a blocking database client inside a servicer, you'll starve the event loop. Use async DB clients (asyncpg, Motor, SQLAlchemy async) or wrap blocking calls with asyncio.to_thread().
Missing TLS in production
grpc.insecure_channel is for development only. In production, always use TLS. With Kubernetes, use a service mesh (Istio, Linkerd) for mTLS between services — no code changes required.
Summary
- Define your schema in
.protofiles and generate stubs — the schema is the contract - gRPC supports four patterns: unary, server streaming, client streaming, and bidirectional streaming
- Use interceptors for auth, logging, and tracing — keep servicers clean
- Prefer
grpc.aiofor high-throughput services and pair with async DB clients - Never change field numbers; only add new fields to maintain wire compatibility