Search⌘ K
AI Features

The Server in Reactive Programming

Explore the concept of the server in reactive programming within Spring Boot applications. Understand how reactive streams work by processing asynchronous data, transforming it using mapping functions, and triggering actions only upon subscription. Gain insights into managing signals such as onNext, onError, and onComplete to build efficient web controllers that handle requests non-blockingly.

We'll cover the following...

To focus on the concept of a server delivering those asynchronous dishes we studied, let’s write some more code!

A SimpleServer

Let’s take a look at the code for the SimpleServer class below:

Java
class SimpleServer {
private final KitchenService kitchen;
SimpleServer(KitchenService kitchen) { //1
this.kitchen = kitchen;
}
Flux<Dish> doingMyJob() { //2
return this.kitchen.getDishes()
.map(dish -> Dish.deliver(dish));//3
}
}

This SimpleServer class has the following features:

  1. In line 5, whoever creates an instance of SimpleServer must provide it with an instance of KitchenService. This is known as constructor injection.

  2. In line 8, the doingMyJob() function, which a manager can tap, invokes the kitchen to getDishes().

  3. In line 10, after asking the kitchen for dishes, it uses .map() to define a handler and tell them what to do with each dish when it arrives. In this case, it invokes the deliver(dish) function.

Note: The deliver(dish) function of the Dish class sets the dish delivered state to true.

With this code, we’ve defined a simple reactive consumer. It invokes another reactive service and transforms the results.

Look closely and notice that, while retrieving a Flux of Dish objects from the kitchen, it returns the same type. The difference is that the kitchen produces a cooked series of entrées, while the SimpleServer produces a delivered series of entrées.

We are using a Java 8 lambda function dish → Dish.deliver(dish) in the code above. Lambda is a fancy way of saying anonymous. On the left side of the arrow are the inputs, dish in this case. On the right side of the arrow is what we do with it. The SimpleServer invokes the deliver() function for Dish. This function is applied to every instance of Dish the kitchen sends through Flux. Because this map transforms one Flux into another, its mapping function must return something. That’s why our deliver() function can’t have a void return type. Instead, it returns an instance of Dish.

Note: We’ll use the Flux.map() function throughout this course, so stop and reread the code to make sure you understand what it’s doing.

Project Reactor provides a rich programming model. Not only can we do functional transforms, but we can hook into the Reactive Stream life cycle, which includes the onNext(), onError(), and onComplete() signals; something else missing from Future objects.

To see some of this in action, let’s imagine having a very polite service.

A PoliteServer

Let’s take a look at the code for the PoliteServer class below:

Java
class PoliteServer {
private final KitchenService kitchen;
PoliteServer(KitchenService kitchen) {
this.kitchen = kitchen;
}
Flux<Dish> doingMyJob() {
return this.kitchen.getDishes()
.doOnNext(dish -> System.out.println("Thank you for " + dish + "!"))
.doOnError(error -> System.out.println("So sorry about "
+ error.getMessage()))
.doOnComplete(() -> System.out.println("Thanks for all your hard work!"))
.map(Dish::deliver);
}
}

This server has the same initialization as the kitchen, as well as how each dish is handled. It has the following differences in its doingMyJob() function:

  • In line 11, using the .doOnNext() function, we add an extra bit of functionality to thank the kitchen by using the onNext() function.

  • In line 12, using the .doOnError() function, we register some code to run anytime the onError() signal happens.

  • In line 14, using the .doOnComplete() function, we have some code to run whenever the kitchen signals it’s done with onComplete().

What’s not shown is the fact that we can use these methods more than once. Register all the handlers we need!

While we don’t have to use these signals directly in our own code, being aware of them gives us the most options in our code.

We can create all the .doOnNext() operations we like:

return this.kitchen.getDishes() 
   .doOnNext( 
       dish -> System.out.println("Thank you for " + dish + "!")) 
   .doOnNext( 
       dish -> System.out.println("Marking the ticket as done.")) 
   .doOnNext(dish -> System.out.println("Grabbing some silverware.")) 
   .map(this::deliver);

However, it’s currently recommended to register multiple handlers inside a single .doOnNext():

return this.kitchen.getDishes() 
    .doOnNext(dish -> {
      System.out.println("Thank you for " + dish + "!");
      System.out.println("Marking the ticket as done.");
      System.out.println("Grabbing some silverware.");
    }) 
    .map(this::deliver);

While it may, at first, seem handy to avoid getting things tangled together, it’s actually more performant to reduce the number of callbacks. We can still maintain separation by having the functions themselves grouped in proper classes. It’s about keeping different tasks and functions free and clear of each other.

So far, we’ve defined a kitchen and different types of servers. We’ve also seen how to transform things while also responding to Reactive Streams signals.

What’s missing is the fact that we haven’t seen how to actually start the flow, or discussed what that means. In Project Reactor, we can define all the flows and handlers we need, but nothing actually happens until we subscribe. Subscription is key. This isn’t just part of Reactor, but is instead a concept baked into that tiny Reactor Streams spec mentioned earlier. No data or activity materializes until someone asks for it. The following code shows what happens when the restaurant asks the server to do their job:

Java
class PoliteRestaurant {
public static void main(String... args) {
PoliteServer server = new PoliteServer(new KitchenService());
server.doingMyJob().subscribe(
dish -> System.out.println("Consuming " + dish),
throwable -> System.err.println(throwable));
}
}

In line 4, inside the main() method creates the kitchen and the server. It’s a simple way to wire up the server.

In lines 6-8, where things get interesting. We call server.doingMyJob() followed by subscribe. The doingMyJob() function, as shown earlier, gives us a Flux<Dish>, but nothing happens yet. Remember, it’s just a placeholder for when things arrive.

In the KitchenService, it may be hard-wired with three dishes, but nothing happens when we call doingMyJob()! In Reactor-based apps, nothing happens until we subscribe.

Note: The act of subscription refers to when we subscribe and start asking for results. Project Reactor is inherently lazy. Nothing happens until someone subscribes and starts pulling. Connections aren’t opened, web requests aren’t processed, and web filters aren’t activated until someone subscribes and starts asking for data. In this case, the .subscribe() call includes a Java 8 Consumer, which is the code that pulls in this scenario.

Looking a little closer, we can see that this usage of subscribe has a Java 8 Consumer as its first argument. This callback is invoked on every instance of Dish (during the Reactive Streams onNext() signal). In this case, it’s another one of those lambda functions, dish → System.out.println("Consuming " + dish). It prints to the console.

This subscribe method also has a second argument, throwable → System.err.println(throwable). This is what happens when an error occurs and Reactive Streams sends the onError(throwable) signal.

Want to run the code and see what we get? Do it! Click the “Run” button below:

Note: The execution of the below code will not work on Safari because of encoding issues, so it is suggested to use another browser, like Chrome or Firefox.

@REM ----------------------------------------------------------------------------
@REM Licensed to the Apache Software Foundation (ASF) under one
@REM or more contributor license agreements.  See the NOTICE file
@REM distributed with this work for additional information
@REM regarding copyright ownership.  The ASF licenses this file
@REM to you under the Apache License, Version 2.0 (the
@REM "License"); you may not use this file except in compliance
@REM with the License.  You may obtain a copy of the License at
@REM
@REM    https://www.apache.org/licenses/LICENSE-2.0
@REM
@REM Unless required by applicable law or agreed to in writing,
@REM software distributed under the License is distributed on an
@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@REM KIND, either express or implied.  See the License for the
@REM specific language governing permissions and limitations
@REM under the License.
@REM ----------------------------------------------------------------------------

@REM ----------------------------------------------------------------------------
@REM Maven Start Up Batch script
@REM
@REM Required ENV vars:
@REM JAVA_HOME - location of a JDK home dir
@REM
@REM Optional ENV vars
@REM M2_HOME - location of maven2's installed home dir
@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands
@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a keystroke before ending
@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven
@REM     e.g. to debug Maven itself, use
@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files
@REM ----------------------------------------------------------------------------

@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on'
@echo off
@REM set title of command window
title %0
@REM enable echoing by setting MAVEN_BATCH_ECHO to 'on'
@if "%MAVEN_BATCH_ECHO%" == "on"  echo %MAVEN_BATCH_ECHO%

@REM set %HOME% to equivalent of $HOME
if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%")

@REM Execute a user defined script before this one
if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre
@REM check for pre script, once with legacy .bat ending and once with .cmd ending
if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat"
if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd"
:skipRcPre

@setlocal

set ERROR_CODE=0

@REM To isolate internal variables from possible post scripts, we use another setlocal
@setlocal

@REM ==== START VALIDATION ====
if not "%JAVA_HOME%" == "" goto OkJHome

echo.
echo Error: JAVA_HOME not found in your environment. >&2
echo Please set the JAVA_HOME variable in your environment to match the >&2
echo location of your Java installation. >&2
echo.
goto error

:OkJHome
if exist "%JAVA_HOME%\bin\java.exe" goto init

echo.
echo Error: JAVA_HOME is set to an invalid directory. >&2
echo JAVA_HOME = "%JAVA_HOME%" >&2
echo Please set the JAVA_HOME variable in your environment to match the >&2
echo location of your Java installation. >&2
echo.
goto error

@REM ==== END VALIDATION ====

:init

@REM Find the project base dir, i.e. the directory that contains the folder ".mvn".
@REM Fallback to current working directory if not found.

set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR%
IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir

set EXEC_DIR=%CD%
set WDIR=%EXEC_DIR%
:findBaseDir
IF EXIST "%WDIR%"\.mvn goto baseDirFound
cd ..
IF "%WDIR%"=="%CD%" goto baseDirNotFound
set WDIR=%CD%
goto findBaseDir

:baseDirFound
set MAVEN_PROJECTBASEDIR=%WDIR%
cd "%EXEC_DIR%"
goto endDetectBaseDir

:baseDirNotFound
set MAVEN_PROJECTBASEDIR=%EXEC_DIR%
cd "%EXEC_DIR%"

:endDetectBaseDir

IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig

@setlocal EnableExtensions EnableDelayedExpansion
for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a
@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS%

:endReadAdditionalConfig

SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe"
set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar"
set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain

set DOWNLOAD_URL="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar"

FOR /F "tokens=1,2 delims==" %%A IN ("%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties") DO (
    IF "%%A"=="wrapperUrl" SET DOWNLOAD_URL=%%B
)

@REM Extension to allow automatically downloading the maven-wrapper.jar from Maven-central
@REM This allows using the maven wrapper in projects that prohibit checking in binary data.
if exist %WRAPPER_JAR% (
    if "%MVNW_VERBOSE%" == "true" (
        echo Found %WRAPPER_JAR%
    )
) else (
    if not "%MVNW_REPOURL%" == "" (
        SET DOWNLOAD_URL="%MVNW_REPOURL%/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar"
    )
    if "%MVNW_VERBOSE%" == "true" (
        echo Couldn't find %WRAPPER_JAR%, downloading it ...
        echo Downloading from: %DOWNLOAD_URL%
    )

    powershell -Command "&{"^
		"$webclient = new-object System.Net.WebClient;"^
		"if (-not ([string]::IsNullOrEmpty('%MVNW_USERNAME%') -and [string]::IsNullOrEmpty('%MVNW_PASSWORD%'))) {"^
		"$webclient.Credentials = new-object System.Net.NetworkCredential('%MVNW_USERNAME%', '%MVNW_PASSWORD%');"^
		"}"^
		"[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12; $webclient.DownloadFile('%DOWNLOAD_URL%', '%WRAPPER_JAR%')"^
		"}"
    if "%MVNW_VERBOSE%" == "true" (
        echo Finished downloading %WRAPPER_JAR%
    )
)
@REM End of extension

@REM Provide a "standardized" way to retrieve the CLI args that will
@REM work with both Windows and non-Windows executions.
set MAVEN_CMD_LINE_ARGS=%*

%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %*
if ERRORLEVEL 1 goto error
goto end

:error
set ERROR_CODE=1

:end
@endlocal & set ERROR_CODE=%ERROR_CODE%

if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost
@REM check for post script, once with legacy .bat ending and once with .cmd ending
if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat"
if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd"
:skipRcPost

@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on'
if "%MAVEN_BATCH_PAUSE%" == "on" pause

if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE%

exit /B %ERROR_CODE%
A polite restaurant

The expected terminal output is provided below:

Thank you for Dish{description='Sesame chicken', delivered=false}!
Consuming Dish{description='Sesame chicken', delivered=true}
Thank you for Dish{description='Lo mein noodles, plain', delivered=false}!
Consuming Dish{description='Lo mein noodles, plain', delivered=true}
Thank you for Dish{description='Sweet & sour beef', delivered=false}!
Consuming Dish{description='Sweet & sour beef', delivered=true}
Thanks for all your hard work!

Remember: Nothing happens until we subscribe. Reactive connections aren’t opened, reactive databases don’t produce data, and reactive web services don’t write HTTP responses until we subscribe.

Now stop for a second. We’ve been talking about a server ferrying meals from the kitchen to the customer. What if all those dining customers were actually people visiting a website? What if the kitchen in the back was a mixture of data stores and server-side services?

Our role as the server would be taking orders from customers to the kitchen, followed by delivering the requested items, which would quite easily match up with a web controller!

That’s right, our asynchronous, non-blocking approach to fulfilling orders is exactly what a reactive web controller does. A reactive web controller doesn’t take orders to the kitchen and then stands around, waiting for completion. Instead, the server goes out and processes other customer requests. Only when the kitchen signals it’s ready does the server return to carry out that task.