tl;dr

You can use server-side streaming in gRPC Java in a blocking and in a (scalable) non-blocking way. Download github.com/JohannesFKnauf/grpc-java-threading-experiment and start playing. It’s also a nice example of how to implement publish-subscribe using the Java 9 Flow API.

I thought you hate Java?

When I recently complained about Java, some colleagues quoted:

“There are only two kinds of languages: the ones people complain about and the ones nobody uses.”
― Bjarne Stroustrup, The C++ Programming Language

I do hate Java as a programming language, yes. But still, I have returned to the pleasure agony of writing software in plain Java. The reason: I joined a new customer team. It develops shared platform libraries for the JVM – the core features almost every product team in the company will use. Our noble goal is to enable our users to choose whatever JVM technology they like.

Since the bytecode result of compiling plain Java is just the least common denominator for all JVM languages, we settled for this approach.

Will each long-living gRPC server-side streaming task in Java block a thread?

A couple of days ago, a colleague brought up the topic of long-living server-side streaming. He had a use case in mind: a long-living notification channel. A client could subscribe to change notifications by calling a gRPC service with server-side streaming. A service like this example:

syntax = "proto3";

option java_multiple_files = true;
option java_package = "de.metamorphant.examples.grpcticker";
option java_outer_classname = "TickerProto";

package ticker;

service StreamingTicker {
  rpc subscribeTicker (TickRequest) returns (stream TickReply) {}
}

message TickRequest {
}

message TickReply {
  string tick = 1;
}

As you can see, the TickRequest is of no importance. It’s an empty message and will be ignored. The TickReply is returned as a stream, i.e. a client will receive many TickReplys.

Now, the question came up in the team: Will each stream block a thread on the server side? This would limit the scalability of such an approach immensely.

Why is it really simple in Go?

Besides the JVM, we support the Go platform. In Go’s gRPC implementation of server-side streaming, concurrent sessions are handled using goroutines and the stream is closed when a handler function returns (as described in this article about gRPC long-lived streaming in Go). The power of goroutines ensures scalability.

How can I learn more about gRPC Java?

Java’s threads are real system threads. Hence, blocking a method – and thereby a thread – is not an option. A scalable API for Java will have to look different.

Instead of scrolling endless web pages, I decided to perform a little experiment in my spare time. You are just reading about it.

Let a hundred clients stream

In order to demonstrate a scalable solution in Java, I created 2 variants of a simple gRPC service: a blocking and a non-blocking implementation. For testing the scaling behaviour, I created a client app, which opens 100 separate channels consuming from the server. So, each time I start the client app, I create another 100 consumers.

The blocking server

The blocking server app – similar to a classical web application server – will call a method subscribeTicker in a separate thread for each incoming connection. This thread will be reclaimed only when we return from the method.

package de.metamorphant.examples.grpcticker;

import io.grpc.stub.StreamObserver;

public class SyncTickerService extends StreamingTickerGrpc.StreamingTickerImplBase {
  @Override
  public void subscribeTicker(TickRequest request, StreamObserver<TickReply> responseObserver) {
    TickReply reply = TickReply.newBuilder().setTick("Blarz").build();
    for (int i = 0; i < 100; i++) {
      responseObserver.onNext(reply);
      try {
        Thread.sleep(1000);
      } catch (InterruptedException ex) {
        return;
      }
    }
    responseObserver.onCompleted();
  }
}

In this example, I simulate a long-running operation by sleeping inside of the method.

Let’s build it and start a server:

$ ./gradlew clean check assemble installDist

BUILD SUCCESSFUL in 8s
13 actionable tasks: 13 executed
$ tar xf build/distributions/grpc-java-threading-experiment.tar
$ grpc-java-threading-experiment/bin/grpc-java-threading-experiment server

Now, in a separate terminal, let’s spawn 10 clients:

$ for i in {1..10}; do grpc-java-threading-experiment/bin/grpc-java-threading-experiment client & done

The client terminal will just deliver all the streamed messages:

14:48:58.632 [Thread-61] INFO  d.m.e.grpcticker.TickStreamClient - Blarz
14:48:58.637 [Thread-57] INFO  d.m.e.grpcticker.TickStreamClient - Blarz
14:48:58.637 [Thread-66] INFO  d.m.e.grpcticker.TickStreamClient - Blarz
...

You can ignore them.

The server terminal will show us how many threads it spawns to deliver those streams:

$ grpc-java-threading-experiment/bin/grpc-java-threading-experiment server
14:48:47.554 [main] INFO  d.m.examples.grpcticker.GrpcServer - Server started, listening on 54321
14:48:48.373 [Thread-0] INFO  d.m.e.grpcticker.ThreadCountReporter - Thread count: 3
14:48:49.374 [Thread-0] INFO  d.m.e.grpcticker.ThreadCountReporter - Thread count: 3
14:48:50.374 [Thread-0] INFO  d.m.e.grpcticker.ThreadCountReporter - Thread count: 3
14:48:51.375 [Thread-0] INFO  d.m.e.grpcticker.ThreadCountReporter - Thread count: 3
14:48:52.375 [Thread-0] INFO  d.m.e.grpcticker.ThreadCountReporter - Thread count: 3
14:48:53.376 [Thread-0] INFO  d.m.e.grpcticker.ThreadCountReporter - Thread count: 3
14:48:54.377 [Thread-0] INFO  d.m.e.grpcticker.ThreadCountReporter - Thread count: 3
14:48:55.377 [Thread-0] INFO  d.m.e.grpcticker.ThreadCountReporter - Thread count: 3
14:48:56.377 [Thread-0] INFO  d.m.e.grpcticker.ThreadCountReporter - Thread count: 3
14:48:57.378 [Thread-0] INFO  d.m.e.grpcticker.ThreadCountReporter - Thread count: 7
14:48:58.378 [Thread-0] INFO  d.m.e.grpcticker.ThreadCountReporter - Thread count: 57
14:48:59.378 [Thread-0] INFO  d.m.e.grpcticker.ThreadCountReporter - Thread count: 606
14:49:00.379 [Thread-0] INFO  d.m.e.grpcticker.ThreadCountReporter - Thread count: 1007
14:49:01.379 [Thread-0] INFO  d.m.e.grpcticker.ThreadCountReporter - Thread count: 1007
14:49:02.379 [Thread-0] INFO  d.m.e.grpcticker.ThreadCountReporter - Thread count: 1007
...
14:51:37.334 [Thread-0] INFO  d.m.e.grpcticker.ThreadCountReporter - Thread count: 1007
14:51:38.342 [Thread-0] INFO  d.m.e.grpcticker.ThreadCountReporter - Thread count: 1000
14:51:39.345 [Thread-0] INFO  d.m.e.grpcticker.ThreadCountReporter - Thread count: 487
14:51:40.349 [Thread-0] INFO  d.m.e.grpcticker.ThreadCountReporter - Thread count: 7
14:51:41.353 [Thread-0] INFO  d.m.e.grpcticker.ThreadCountReporter - Thread count: 7
...
...

You can clearly see that for each stream, a separate thread has to be created and maintained – 1000 clients, 1000 threads.

The non-blocking server – or: leveraging Java 9’s Flow API

The non-blocking server app leverages a feature in the gRPC Java API. The StreamObserver API in its very nature is asynchronous. You can implement it in a way, which does not block.

For that purpose, I will implement a publish-subscribe scheme using Java 9’s Flow API. (Some might argue whether it’s really the publish-subscribe pattern or rather the observer pattern or something entirely different; but I don’t want to bore you.)

Here is what we’re aiming for:

Sketch of the publish-subscribe pattern

A single publisher will produce a continuous stream of messages. It will notify all current subscribers upon submission of a new message.

First, I need a notification publisher.

package de.metamorphant.examples.grpcticker;

import java.util.concurrent.SubmissionPublisher;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TickPublisher implements Runnable {
  private static final Logger logger = LoggerFactory.getLogger(TickPublisher.class.getName());

  SubmissionPublisher<TickReply> tickPublisher;

  public TickPublisher(SubmissionPublisher<TickReply> tickPublisher) {
    this.tickPublisher = tickPublisher;
  }

  @Override
  public void run() {
    int i = 0;
    while (!Thread.currentThread().isInterrupted()) {
      i++;
      TickReply reply = TickReply.newBuilder().setTick("Blarz " + i).build();
      Integer n = tickPublisher.getNumberOfSubscribers();
      logger.info("Publishing new tick " + i + " to " + n + " subscribers.");
      tickPublisher.submit(reply);
      try {
        Thread.sleep(1000);
      } catch (InterruptedException ex) {
        return;
      }
    }
  }
}

The TickPublisher is going to simulate the notification stream. You inject a SubmissionPublisher tickPublisher on construction.

...
  private void startPublisher() {
    Thread publisher = new Thread(new TickPublisher(tickPublisher));
    publisher.start();
  }
...

The same tickPublisher is going to be used as a communication channel to the subscribers.

...
      GrpcServer server = new GrpcServer(new AsyncTickerService(tickPublisher));
...

The AsyncTickerService implementation of our gRPC service will just create a new subscriber and subscribe it to the tickPublisher, when a new gRPC request arrives. The subscriber will store the observer in order to deliver messages later. The handler method returns. The thread is reclaimed, but the observer stays alive.

Cartoon on the publisher's subscribe method

package de.metamorphant.examples.grpcticker;

import java.util.concurrent.SubmissionPublisher;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.grpc.stub.StreamObserver;

public class AsyncTickerService extends StreamingTickerGrpc.StreamingTickerImplBase {
  private static final Logger logger = LoggerFactory.getLogger(AsyncTickerService.class.getName());

  private SubmissionPublisher<TickReply> tickPublisher;

  public AsyncTickerService(SubmissionPublisher<TickReply> tickPublisher) {
    this.tickPublisher = tickPublisher;
  }

  @Override
  public void subscribeTicker(TickRequest request, StreamObserver<TickReply> responseObserver) {
    logger.debug("Registering a new subscriber");
    TickSubscriber tickSubscriber = new TickSubscriber(responseObserver);
    tickPublisher.subscribe(tickSubscriber);
  }
}

The only missing link is the TickSubscribers behaviour. It will have to react to messages from the publisher. First, it will have to receive and store its subscription in the onSubscribe method.

Cartoon of the subscriber's onSubscribe method

Then, it’s going to receive message by message through the onNext method.

Cartoon of the subscriber's onNext method

In addition, I define a termination criterion and close the stream after receiving 100 messages. This enables me to see the behaviour when the number of clients is declining again.

package de.metamorphant.examples.grpcticker;

import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.grpc.stub.StreamObserver;

public class TickSubscriber implements Subscriber<TickReply> {
  private static final Logger logger = LoggerFactory.getLogger(TickSubscriber.class.getName());

  private AtomicInteger count = new AtomicInteger(0);
  private Subscription subscription;

  private StreamObserver<TickReply> observer;

  public TickSubscriber(StreamObserver<TickReply> observer) {
    this.count = new AtomicInteger(0);
    this.observer = observer;
  }

  @Override
  public void onComplete() {
    logger.debug("Subscriber has been completed.");
  }

  @Override
  public void onError(Throwable t) {
    logger.debug("A problem occured in subscriber: ", t);
  }

  @Override
  public void onNext(TickReply reply) {
    logger.debug("Subscriber received next tick: " + reply.getTick());
    observer.onNext(reply);
    Integer currentCount = count.incrementAndGet();
    if (currentCount > 100) {
      this.subscription.cancel();
      observer.onCompleted();
    } else {
      this.subscription.request(1);
    }
  }

  @Override
  public void onSubscribe(Subscription subscription) {
    logger.debug("Subscribing subscriber");
    this.subscription = subscription;
    this.subscription.request(1);
  }
}

Test Drive II: The Duel

When you run the async-server and spawn 1000 – or even 2000 or more – clients, the server’s thread count will stay low.

$ grpc-java-threading-experiment/bin/grpc-java-threading-experiment async-server
20:06:22.641 [Thread-1] INFO  d.m.e.grpcticker.TickPublisher - Publishing new tick 1 to 0 subscribers.
20:06:22.776 [main] INFO  d.m.examples.grpcticker.GrpcServer - Server started, listening on 54321
20:06:23.588 [Thread-0] INFO  d.m.e.grpcticker.ThreadCountReporter - Thread count: 4
20:06:23.644 [Thread-1] INFO  d.m.e.grpcticker.TickPublisher - Publishing new tick 2 to 0 subscribers.
20:06:24.589 [Thread-0] INFO  d.m.e.grpcticker.ThreadCountReporter - Thread count: 4
20:06:24.644 [Thread-1] INFO  d.m.e.grpcticker.TickPublisher - Publishing new tick 3 to 0 subscribers.
20:06:25.612 [Thread-0] INFO  d.m.e.grpcticker.ThreadCountReporter - Thread count: 4
...
20:06:30.625 [Thread-0] INFO  d.m.e.grpcticker.ThreadCountReporter - Thread count: 19
20:06:30.665 [Thread-1] INFO  d.m.e.grpcticker.TickPublisher - Publishing new tick 9 to 0 subscribers.
20:06:31.641 [Thread-0] INFO  d.m.e.grpcticker.ThreadCountReporter - Thread count: 38
20:06:31.667 [Thread-1] INFO  d.m.e.grpcticker.TickPublisher - Publishing new tick 10 to 1000 subscribers.
20:06:32.642 [Thread-0] INFO  d.m.e.grpcticker.ThreadCountReporter - Thread count: 38
20:06:32.678 [Thread-1] INFO  d.m.e.grpcticker.TickPublisher - Publishing new tick 11 to 1000 subscribers.
...
20:06:40.680 [Thread-0] INFO  d.m.e.grpcticker.ThreadCountReporter - Thread count: 38
20:06:40.721 [Thread-1] INFO  d.m.e.grpcticker.TickPublisher - Publishing new tick 19 to 1000 subscribers.
20:06:41.682 [Thread-0] INFO  d.m.e.grpcticker.ThreadCountReporter - Thread count: 38
20:06:41.727 [Thread-1] INFO  d.m.e.grpcticker.TickPublisher - Publishing new tick 20 to 1066 subscribers.
20:06:42.688 [Thread-0] INFO  d.m.e.grpcticker.ThreadCountReporter - Thread count: 38
20:06:42.728 [Thread-1] INFO  d.m.e.grpcticker.TickPublisher - Publishing new tick 21 to 1991 subscribers.
20:06:43.689 [Thread-0] INFO  d.m.e.grpcticker.ThreadCountReporter - Thread count: 38
20:06:43.730 [Thread-1] INFO  d.m.e.grpcticker.TickPublisher - Publishing new tick 22 to 2000 subscribers.
20:06:44.690 [Thread-0] INFO  d.m.e.grpcticker.ThreadCountReporter - Thread count: 38
20:06:44.737 [Thread-1] INFO  d.m.e.grpcticker.TickPublisher - Publishing new tick 23 to 2000 subscribers.
...
20:08:19.975 [Thread-1] INFO  d.m.e.grpcticker.TickPublisher - Publishing new tick 109 to 2000 subscribers.
20:08:20.922 [Thread-0] INFO  d.m.e.grpcticker.ThreadCountReporter - Thread count: 11
20:08:20.977 [Thread-1] INFO  d.m.e.grpcticker.TickPublisher - Publishing new tick 110 to 2000 subscribers.
20:08:21.923 [Thread-0] INFO  d.m.e.grpcticker.ThreadCountReporter - Thread count: 43
20:08:21.995 [Thread-1] INFO  d.m.e.grpcticker.TickPublisher - Publishing new tick 111 to 1000 subscribers.
20:08:22.927 [Thread-0] INFO  d.m.e.grpcticker.ThreadCountReporter - Thread count: 43
...
20:08:31.071 [Thread-1] INFO  d.m.e.grpcticker.TickPublisher - Publishing new tick 120 to 1000 subscribers.
20:08:31.947 [Thread-0] INFO  d.m.e.grpcticker.ThreadCountReporter - Thread count: 43
20:08:32.072 [Thread-1] INFO  d.m.e.grpcticker.TickPublisher - Publishing new tick 121 to 934 subscribers.
20:08:32.948 [Thread-0] INFO  d.m.e.grpcticker.ThreadCountReporter - Thread count: 45
20:08:33.073 [Thread-1] INFO  d.m.e.grpcticker.TickPublisher - Publishing new tick 122 to 9 subscribers.
20:08:33.950 [Thread-0] INFO  d.m.e.grpcticker.ThreadCountReporter - Thread count: 45
20:08:34.073 [Thread-1] INFO  d.m.e.grpcticker.TickPublisher - Publishing new tick 123 to 0 subscribers.
20:08:34.951 [Thread-0] INFO  d.m.e.grpcticker.ThreadCountReporter - Thread count: 45
20:08:35.074 [Thread-1] INFO  d.m.e.grpcticker.TickPublisher - Publishing new tick 124 to 0 subscribers.
20:08:35.951 [Thread-0] INFO  d.m.e.grpcticker.ThreadCountReporter - Thread count: 45
...

Mission accomplished!

Caveat: StreamObserver may not be thread-safe

⚠⚠⚠
From the StreamObserver API docs: “Implementations are not required to be thread-safe […] Since individual StreamObservers are not thread-safe, if multiple threads will be writing to a StreamObserver concurrently, the application must synchronize calls.”
⚠⚠⚠

In this example code, I circumvent concurrent access to the same StreamObserver by employing the contractual delivery guarantees for all Publisher implementations: “Publishers ensure that Subscriber method invocations for each subscription are strictly ordered in happens-before order.”

Problems for the reader: What determines the thread count, now?

Hint 1: Try replacing the Publisher thread pool with sequential execution

  private SubmissionPublisher<TickReply> tickPublisher = new SubmissionPublisher<>(Runnable::run, 100);

Hint 2: Try to manipulate the gRPC request handler thread pool

    this.server = ServerBuilder.forPort(DEFAULT_PORT)
        .addService(service)
        .executor(Executors.newFixedThreadPool(5))
        .build();


Post header background image by Al3xanderD (Alexander Droeger) from Pixabay.


Contact us