gRPC with Python — Build High-Performance APIs with Protocol Buffers

Build high-performance gRPC APIs with Python: define Protobuf schemas, generate stubs, implement streaming, handle errors, and benchmark vs REST with practical examples.

Why gRPC Over REST?

REST with JSON is the default for public APIs. For internal microservices, gRPC often wins on all three dimensions that matter most: performance (Protobuf binary is 3–10x smaller than JSON), type safety (schema-first with generated stubs in every language), and streaming (bidirectional streams are first-class, not bolted on).

gRPC uses HTTP/2 under the hood — multiplexed streams, header compression, and persistent connections all come for free. When your order service calls inventory 10,000 times per second, those gains matter.

You can validate Protobuf-to-JSON conversions using the DevKits JSON Formatter, and use the Base64 Encoder/Decoder to inspect binary Protobuf payloads.

Installation

pip install grpcio grpcio-tools

# For async support (recommended for FastAPI integration)
pip install grpcio grpcio-tools grpcio-reflection

Define Your Protobuf Schema

Everything starts with a .proto file. This is your source of truth — both the server and every client generate code from the same schema.

// user_service.proto
syntax = "proto3";

package users;

// The service definition
service UserService {
  rpc GetUser (GetUserRequest) returns (User);
  rpc CreateUser (CreateUserRequest) returns (User);
  rpc ListUsers (ListUsersRequest) returns (stream User);
  rpc BulkCreateUsers (stream CreateUserRequest) returns (BulkCreateResponse);
}

message GetUserRequest {
  string user_id = 1;
}

message CreateUserRequest {
  string name = 1;
  string email = 2;
  int32 age = 3;
}

message User {
  string id = 1;
  string name = 2;
  string email = 3;
  int32 age = 4;
  int64 created_at = 5;  // Unix timestamp
}

message ListUsersRequest {
  int32 page_size = 1;
  string page_token = 2;
}

message BulkCreateResponse {
  int32 created_count = 1;
  repeated string user_ids = 2;
}

Generate Python Stubs

# Generates user_service_pb2.py (messages) and user_service_pb2_grpc.py (stubs)
python -m grpc_tools.protoc \
    -I. \
    --python_out=. \
    --grpc_python_out=. \
    user_service.proto

Add this to a Makefile so your CI always regenerates stubs from the .proto file — never commit manually edited generated files.

Implement the Server

import grpc
import time
import uuid
from concurrent import futures

import user_service_pb2 as pb2
import user_service_pb2_grpc as pb2_grpc

# In-memory store for this example
USERS: dict[str, pb2.User] = {}


class UserServiceServicer(pb2_grpc.UserServiceServicer):

    def GetUser(self, request, context):
        user = USERS.get(request.user_id)
        if not user:
            context.set_code(grpc.StatusCode.NOT_FOUND)
            context.set_details(f"User {request.user_id!r} not found")
            return pb2.User()
        return user

    def CreateUser(self, request, context):
        # Validation
        if not request.email or "@" not in request.email:
            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
            context.set_details("Invalid email address")
            return pb2.User()

        user = pb2.User(
            id=str(uuid.uuid4()),
            name=request.name,
            email=request.email,
            age=request.age,
            created_at=int(time.time()),
        )
        USERS[user.id] = user
        return user

    def ListUsers(self, request, context):
        """Server-side streaming: yields users one by one"""
        page_size = request.page_size or 10
        for i, user in enumerate(USERS.values()):
            if i >= page_size:
                break
            yield user  # Each yield sends one message to the client

    def BulkCreateUsers(self, request_iterator, context):
        """Client-side streaming: reads all requests, returns one response"""
        created = []
        for req in request_iterator:
            user = pb2.User(
                id=str(uuid.uuid4()),
                name=req.name,
                email=req.email,
                age=req.age,
                created_at=int(time.time()),
            )
            USERS[user.id] = user
            created.append(user.id)
        return pb2.BulkCreateResponse(
            created_count=len(created),
            user_ids=created,
        )


def serve(port: int = 50051):
    server = grpc.server(
        futures.ThreadPoolExecutor(max_workers=10),
        options=[
            ("grpc.max_send_message_length", 10 * 1024 * 1024),  # 10 MB
            ("grpc.max_receive_message_length", 10 * 1024 * 1024),
        ],
    )
    pb2_grpc.add_UserServiceServicer_to_server(UserServiceServicer(), server)
    server.add_insecure_port(f"[::]:{port}")
    server.start()
    print(f"gRPC server running on port {port}")
    server.wait_for_termination()


if __name__ == "__main__":
    serve()

Implement the Client

import grpc
import user_service_pb2 as pb2
import user_service_pb2_grpc as pb2_grpc

def get_channel(host: str = "localhost", port: int = 50051):
    # Use grpc.secure_channel() in production with TLS
    return grpc.insecure_channel(
        f"{host}:{port}",
        options=[("grpc.enable_retries", 1)],
    )

def demo():
    with get_channel() as channel:
        stub = pb2_grpc.UserServiceStub(channel)

        # Unary RPC
        user = stub.CreateUser(pb2.CreateUserRequest(
            name="Alice",
            email="[email protected]",
            age=30,
        ))
        print(f"Created: {user.id} — {user.name}")

        # Unary RPC with error handling
        try:
            stub.GetUser(pb2.GetUserRequest(user_id="nonexistent"))
        except grpc.RpcError as e:
            print(f"Error [{e.code()}]: {e.details()}")

        # Server-side streaming
        print("\\nAll users:")
        for u in stub.ListUsers(pb2.ListUsersRequest(page_size=100)):
            print(f"  - {u.name} ({u.email})")

        # Client-side streaming
        def user_generator():
            for name, email in [("Bob", "[email protected]"), ("Carol", "[email protected]")]:
                yield pb2.CreateUserRequest(name=name, email=email, age=25)

        result = stub.BulkCreateUsers(user_generator())
        print(f"\\nBulk created {result.created_count} users")


if __name__ == "__main__":
    demo()

Async gRPC with asyncio

For high-throughput services, use the async gRPC API to avoid blocking threads:

import grpc.aio
import asyncio

async def async_demo():
    async with grpc.aio.insecure_channel("localhost:50051") as channel:
        stub = pb2_grpc.UserServiceStub(channel)

        user = await stub.CreateUser(pb2.CreateUserRequest(
            name="Dave",
            email="[email protected]",
            age=28,
        ))
        print(f"Async created: {user.name}")

        # Async server streaming
        async for u in stub.ListUsers(pb2.ListUsersRequest(page_size=50)):
            print(f"  {u.name}")


asyncio.run(async_demo())

Interceptors for Cross-Cutting Concerns

gRPC interceptors are the equivalent of HTTP middleware — perfect for auth, logging, and metrics:

class AuthInterceptor(grpc.ServerInterceptor):
    def intercept_service(self, continuation, handler_call_details):
        metadata = dict(handler_call_details.invocation_metadata)
        token = metadata.get("authorization", "")

        if not token.startswith("Bearer "):
            def abort(request, context):
                context.set_code(grpc.StatusCode.UNAUTHENTICATED)
                context.set_details("Missing or invalid token")
                return pb2.User()
            return grpc.unary_unary_rpc_method_handler(abort)

        return continuation(handler_call_details)


# Add interceptor to server
server = grpc.server(
    futures.ThreadPoolExecutor(max_workers=10),
    interceptors=[AuthInterceptor()],
)

gRPC vs REST: When to Choose What

Factor gRPC REST/JSON
Internal microservicesExcellentGood
Public API / browser clientsNeeds gRPC-Web proxyExcellent
Real-time streamingNativeSSE / WebSocket workaround
Schema enforcementStrict (Protobuf)Optional (OpenAPI)
Human readabilityBinary (needs tooling)Human-readable JSON
Payload size3–10x smallerLarger (verbose keys)

Common Pitfalls

Forgetting to regenerate stubs after schema changes

If you change a field number in the .proto file, old clients will silently misread the data. Never change existing field numbers — only add new ones. Wire compatibility is the key constraint of Protobuf evolution.

Blocking in async servicers

If you use grpc.aio on the server but call a blocking database client inside a servicer, you'll starve the event loop. Use async DB clients (asyncpg, Motor, SQLAlchemy async) or wrap blocking calls with asyncio.to_thread().

Missing TLS in production

grpc.insecure_channel is for development only. In production, always use TLS. With Kubernetes, use a service mesh (Istio, Linkerd) for mTLS between services — no code changes required.

Summary

  • Define your schema in .proto files and generate stubs — the schema is the contract
  • gRPC supports four patterns: unary, server streaming, client streaming, and bidirectional streaming
  • Use interceptors for auth, logging, and tracing — keep servicers clean
  • Prefer grpc.aio for high-throughput services and pair with async DB clients
  • Never change field numbers; only add new fields to maintain wire compatibility