Package io.mats3.util
Class MatsFuturizer
java.lang.Object
io.mats3.util.MatsFuturizer
- All Implemented Interfaces:
AutoCloseable
An instance of this class acts as a bridge service between the synchronous world of e.g. a HTTP request, and the
asynchronous world of Mats. In a given project, you typically create a single instance of this class upon startup,
and employ it for all such scenarios. In short, in a HTTP service handler, you initialize a Mats flow using
singletonFuturizer.futurizeNonessential(...)
(or
futurize(...)
for full
configurability), specifying which Mats Endpoint to invoke and the request DTO instance, and then you get a
CompletableFuture
in return. This future will complete once the invoked Mats Endpoint replies.
It is extremely important to understand that this is NOT how you compose multiple Mats Endpoints together! This is
ONLY supposed to be used when you are in a synchronous context (e.g. in a Servlet, or a Spring @RequestMapping), and
want to interact with the Mats fabric of services.
A question you should ask yourself, is how this works in a multi-node setup? For a Mats flow, it does not matter
which node a given stage of a MatsEndpoint is performed, as it is by design totally stateless wrt. the executing
node, as all state resides in the message. However, for a synchronous situation as in a HTTP request, it definitely
matters that the final reply, the one that should complete the returned future, comes in on the same node that issued
the request, as this is where the CompletableFuture instance is, and where the waiting TCP connection is connected!
The trick here is that the final reply is specified to come in on a node-specific Topic, i.e. it literally has
the node name (default being the hostname) as a part of the MatsEndpoint name, and it is a
SubscriptionTerminator
.
Another aspect to understand, is that while Mats "guarantees" that a submitted initiation will flow through the Mats
endpoints, no matter what happens with the processing nodes (unless you employ NonPersistent messaging,
which futurizeNonessential(..) does!), nothing can be guaranteed wrt. the completion of the future: This is
stateful processing. The node where the MatsFuturizer initiation is performed can crash right after the message has
been put on the Mats fabric, and hence the CompletableFuture vanishes along with everything else on that node. The
mats flow is however already in motion, and will be executed - but when the Reply comes in on the node-specific
Topic, there is no longer any corresponding CompletableFuture to complete. This is also why you should not compose
Mats endpoints using this familiar feeling that a CompletableFuture probably gives you: While a multi-stage
MatsEndpoint is asynchronous, resilient and highly available and each stage is transactionally performed, with
retries and all the goodness that comes with a message oriented architecture, once you rely on a CompletableFuture,
you are in a synchronous world where a power outage or a reboot can stop the processing midway. Thus, the
MatsFuturizer should always just be employed out the very outer edge facing the actual client - any other processing
should be performed using MatsEndpoints, and composition of MatsEndpoints should be done using multi-stage
MatsEndpoints.
Note that in the case of pure "GET-style" requests where information is only retrieved and no state in the total
system is changed, everything is a bit more relaxed: If a processing fails, the worst thing that happens is a
slightly annoyed user. But if this was an "add order" or "move money" instruction from the user, a mid-processing
failure is rather bad and could require human intervention to clean up. Thus, the
futurizeNonessential(..)
method should only be employed for such "GET-style" requests, and any other
potentially state changing operations must employ the generic futurize(..)
method.-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic class
This exception is raised through theCompletableFuture
if the timeout specified when getting theCompletableFuture
is reached (to get yourself a future, use one of thefuturizeXYZ(..)
methods).protected static class
static class
An instance of this class will be the return value of anyCompletableFuture
s created with theMatsFuturizer
. -
Field Summary
FieldsModifier and TypeFieldDescriptionprotected final HashMap<String,
MatsFuturizer.Promise<?>> protected final ThreadPoolExecutor
protected final ReentrantLock
protected final MatsFactory
protected final MatsInitiator
protected final int
protected MatsFuturizer.Promise<?>
protected final MatsEndpoint<Void,
String> protected boolean
protected boolean
protected final String
protected final AtomicInteger
protected final Condition
protected final PriorityQueue<MatsFuturizer.Promise<?>>
-
Constructor Summary
ConstructorsModifierConstructorDescriptionprotected
MatsFuturizer
(MatsFactory matsFactory, String endpointIdPrefix, int corePoolSize, int maxPoolSize, int maxOutstandingPromises) -
Method Summary
Modifier and TypeMethodDescriptionprotected void
protected <T> MatsFuturizer.Promise<T>
_createPromise
(String traceId, String from, String to, Class<T> replyClass, int timeout, TimeUnit unit) protected Object
_deserializeReply
(MatsEndpoint.MatsObject matsObject, Class<?> toClass) protected <T> void
_enqueuePromise
(MatsFuturizer.Promise<T> promise) protected void
_handleRepliesForPromises
(MatsEndpoint.ProcessContext<Void> context, String correlationId, MatsEndpoint.MatsObject matsObject) protected ThreadPoolExecutor
_newThreadPool
(int corePoolSize, int maximumPoolSize) protected <T> void
_sendRequestToFulfillPromise
(String from, String endpointId, String traceId, Object request, MatsInitiator.InitiateLambda extraMessageInit, MatsFuturizer.Promise<T> promise) protected void
protected void
_timeoutCompleteExceptionally
(MatsFuturizer.Promise<?> promise, String msg) protected void
_uncheckedComplete
(MatsEndpoint.ProcessContext<Void> context, Object replyObject, MatsFuturizer.Promise<?> promise) void
close()
Closes the MatsFuturizer.static MatsFuturizer
createMatsFuturizer
(MatsFactory matsFactory) Creates a MatsFuturizer, and you should only need one per MatsFactory (which again mostly means one per application or micro-service or JVM).static MatsFuturizer
createMatsFuturizer
(MatsFactory matsFactory, String endpointIdPrefix) Creates a MatsFuturizer, and you should only need one per MatsFactory (which again mostly means one per application or micro-service or JVM).static MatsFuturizer
createMatsFuturizer
(MatsFactory matsFactory, String endpointIdPrefix, int corePoolSize, int maxPoolSize, int maxOutstandingPromises) Creates a MatsFuturizer, and you should only need one per MatsFactory (which again mostly means one per application or micro-service or JVM).<T> CompletableFuture<MatsFuturizer.Reply<T>>
futurize
(String traceId, String from, String to, int timeout, TimeUnit unit, Class<T> replyClass, Object request, MatsInitiator.InitiateLambda customInit) The generic form of initiating a request-message that returns aCompletableFuture
, which enables you to tailor all properties.<T> CompletableFuture<MatsFuturizer.Reply<T>>
futurize
(String traceId, String from, String to, Class<T> replyClass, Object request, MatsInitiator.InitiateLambda customInit) Convenience-variant of the genericfuturize(..)
form, where the timeout is set to 2.5 minutes.<T> CompletableFuture<MatsFuturizer.Reply<T>>
NOTICE: This variant must only be used for "GET-style" Requests where none of the endpoints the call flow passes will add, remove or alter any state of the system, and where it doesn't matter all that much if a message (and hence the Mats flow) is lost!int
-
Field Details
-
_matsFactory
-
_matsInitiator
-
_terminatorEndpointId
-
_futureCompleterThreadPool
-
_maxOutstandingPromises
protected final int _maxOutstandingPromises -
_replyHandlerEndpoint
-
_threadNumber
-
_replyHandlerEndpointStarted
protected volatile boolean _replyHandlerEndpointStarted -
_internalStateLock
-
_timeouterPing_InternalStateLock
-
_correlationIdToPromiseMap
-
_timeoutSortedPromises
-
_nextInLineToTimeout
-
_runFlag
protected volatile boolean _runFlag
-
-
Constructor Details
-
MatsFuturizer
protected MatsFuturizer(MatsFactory matsFactory, String endpointIdPrefix, int corePoolSize, int maxPoolSize, int maxOutstandingPromises)
-
-
Method Details
-
createMatsFuturizer
Creates a MatsFuturizer, and you should only need one per MatsFactory (which again mostly means one per application or micro-service or JVM). The defaults for the parameters from the fully fledged factory method are identical to thecreateMatsFuturizer(MatsFactory, String)
, but with this variant also the 'endpointIdPrefix' is set to what is returned bymatsFactory.getFactoryConfig().getAppName()
. Note that if you - against the above suggestion - create more than one MatsFuturizer for a MatsFactory, then you MUST give them different endpointIdPrefixes, thus you cannot use this method!- Parameters:
matsFactory
- the underlyingMatsFactory
on which outgoing messages will be sent, and on which the receivingSubscriptionTerminator
will be created.- Returns:
- the
MatsFuturizer
, which is tied to a newly createdSubscriptionTerminator
.
-
createMatsFuturizer
Creates a MatsFuturizer, and you should only need one per MatsFactory (which again mostly means one per application or micro-service or JVM). The number of threads in the future-completer-pool is whatmatsFactory.getFactoryConfig().getConcurrency()
returns at creation time x 4 for "corePoolSize", but at least 5, (i.e. "min"); and concurrency * 20, but at least 100, for "maximumPoolSize" (i.e. max). The pool is set up to let non-core threads expire after 5 minutes. The maximum number of outstanding promises is set to 50k.- Parameters:
matsFactory
- the underlyingMatsFactory
on which outgoing messages will be sent, and on which the receivingSubscriptionTerminator
will be created.endpointIdPrefix
- the first part of the endpointId, which typically should be some "class-like" construct denoting the service name, like "OrderService" or "InventoryService", preferably the same prefix you use for all your other endpoints running on this same service. Note: If you create multiple MatsFuturizers for a MatsFactory, this parameter must be different for each instance!- Returns:
- the
MatsFuturizer
, which is tied to a newly createdSubscriptionTerminator
.
-
createMatsFuturizer
public static MatsFuturizer createMatsFuturizer(MatsFactory matsFactory, String endpointIdPrefix, int corePoolSize, int maxPoolSize, int maxOutstandingPromises) Creates a MatsFuturizer, and you should only need one per MatsFactory (which again mostly means one per application or micro-service or JVM). With this factory method you can specify the number of threads in the future-completer-pool with the parameters "corePoolSize" and "maxPoolSize" threads, which effectively means min and max. The pool is set up to let non-core threads expire after 5 minutes. You must also specify the max number of outstanding promises, if you want no effective limit, useInteger.MAX_VALUE
.- Parameters:
matsFactory
- the underlyingMatsFactory
on which outgoing messages will be sent, and on which the receivingSubscriptionTerminator
will be created.endpointIdPrefix
- the first part of the endpointId, which typically should be some "class-like" construct denoting the service name, like "OrderService" or "InventoryService", preferably the same prefix you use for all your other endpoints running on this same service. Note: If you create multiple MatsFuturizers for a MatsFactory, this parameter must be different for each instance!corePoolSize
- the minimum number of threads in the future-completer-pool of threads.maxPoolSize
- the maximum number of threads in the future-completer-pool of threads.maxOutstandingPromises
- the maximum number of outstanding Promises before new are rejected. Should be a fairly high number, e.g. the default ofcreateMatsFuturizer(MatsFactory, String)
is 50k.- Returns:
- the
MatsFuturizer
, which is tied to a newly createdSubscriptionTerminator
.
-
futurize
public <T> CompletableFuture<MatsFuturizer.Reply<T>> futurize(String traceId, String from, String to, int timeout, TimeUnit unit, Class<T> replyClass, Object request, MatsInitiator.InitiateLambda customInit) The generic form of initiating a request-message that returns aCompletableFuture
, which enables you to tailor all properties. To set interactive-, nonPersistent- or noAudit-flags, or to tack on any"sideloads"
to the outgoing message, use the "customInit" parameter, which directly is theInitiateLambda
that the MatsFuturizer initiation is using. For a bit more explanation, please read JavaDoc offuturizeInteractiveUnreliable(..)
- Type Parameters:
T
- the type of the reply DTO.- Parameters:
traceId
- TraceId of the resulting Mats call flow, seeMatsInitiator.MatsInitiate.traceId(CharSequence)
from
- the "from" of the initiation, seeMatsInitiator.MatsInitiate.from(String)
to
- to which Mats endpoint the request should go, seeMatsInitiator.MatsInitiate.to(String)
timeout
- how long before the internal timeout-mechanism of MatsFuturizer kicks in and the future iscompleted exceptionally
with aMatsFuturizer.MatsFuturizerTimeoutException
.unit
- the unit of time of the 'timeout' parameter.replyClass
- which expected reply DTO class that the requested endpoint replies with.request
- the request DTO that should be sent to the endpoint, seeMatsInitiator.MatsInitiate.request(Object)
customInit
- theMatsInitiator.InitiateLambda
that the MatsFuturizer is employing to initiate the outgoing message, which you can use to tailor the message, e.g. setting theinteractive
-flag or tacking on"sideloads"
.- Returns:
- a
CompletableFuture
which will be resolved with aMatsFuturizer.Reply
-instance that contains both some meta-data, and thereply
from the requested endpoint.
-
futurize
public <T> CompletableFuture<MatsFuturizer.Reply<T>> futurize(String traceId, String from, String to, Class<T> replyClass, Object request, MatsInitiator.InitiateLambda customInit) Convenience-variant of the genericfuturize(..)
form, where the timeout is set to 2.5 minutes. To set interactive-, nonPersistent- or noAudit-flags, or to tack on any"sideloads"
to the outgoing message, use the "customInit" parameter, which directly is theInitiateLambda
that the MatsFuturizer initiation is using. For a bit more explanation, please read JavaDoc offuturizeInteractiveUnreliable(..)
- Type Parameters:
T
- the type of the reply DTO.- Parameters:
traceId
- TraceId of the resulting Mats call flow, seeMatsInitiator.MatsInitiate.traceId(CharSequence)
from
- the "from" of the initiation, seeMatsInitiator.MatsInitiate.from(String)
to
- to which Mats endpoint the request should go, seeMatsInitiator.MatsInitiate.to(String)
the unit of time of the 'timeout' parameter.replyClass
- which expected reply DTO class that the requested endpoint replies with.request
- the request DTO that should be sent to the endpoint, seeMatsInitiator.MatsInitiate.request(Object)
customInit
- theMatsInitiator.InitiateLambda
that the MatsFuturizer is employing to initiate the outgoing message, which you can use to tailor the message, e.g. setting theinteractive
-flag or tacking on"sideloads"
.- Returns:
- a
CompletableFuture
which will be resolved with aMatsFuturizer.Reply
-instance that contains both some meta-data, and thereply
from the requested endpoint.
-
futurizeNonessential
public <T> CompletableFuture<MatsFuturizer.Reply<T>> futurizeNonessential(String traceId, String from, String to, Class<T> replyClass, Object request) NOTICE: This variant must only be used for "GET-style" Requests where none of the endpoints the call flow passes will add, remove or alter any state of the system, and where it doesn't matter all that much if a message (and hence the Mats flow) is lost! The goal of this method is to be able to get hold of e.g. account holdings, order statuses etc, for presentation to a user. The thinking is that if such a flow fails where a message of the call flow disappears, this won't make for anything else than a bit annoyed user: No important state change, like the adding, deleting or change of an order, will be lost. Also, speed is of the essence. Therefore, non-persistent. At the same time, to make the user super happy in the ordinary circumstances, all messages in this call flow will be prioritized, and thus skip any queue backlogs that have arose on any of the call flow's endpoints, e.g. due to some massive batch of (background) processes executing at the same time. Therefore, interactive. Notice that with both of these features combined, you get very fast messaging, as non-persistent means that the message will not have to be stored to permanent storage at any point, while interactive means that it will skip any backlogged queues. In addition, the noAudit flag is set, since it is a waste of storage space to archive the actual contents of Request and Reply messages that do not alter the system. Sets the following properties on the sent Mats message:- Non-persistent: Since it is not vitally important that this message is not lost, non-persistent messaging can be used. The minuscule chance for this message to disappear is not worth the considerable overhead of store-and-forward multiple times to persistent storage. Also, speed is much more interesting.
- Interactive: Since the Futurizer should only be used as a "synchronous bridge" when a human is actively waiting for the response, the interactive flag is set. (For all other users, you should rather code "proper Mats" with initiations, endpoints and terminators).
- No audit: Since this message will not change the state of the system (i.e. the "GET-style" requests), using storage on auditing requests and replies is not worthwhile.
non-persistent
(unreliable),interactive
(prioritized),non-audited
(request and reply DTOs won't be archived) Request-message to the specified endpoint, returning aCompletableFuture
that will becompleted
when the Reply from the requested endpoint comes back. The internal MatsFuturizer timeout will be set to 2.5 minutes, meaning that if there is no reply forthcoming within that time, theCompletableFuture
will becompleted exceptionally
with aMatsFuturizerTimeoutException
, and the Promise deleted from the futurizer. 2.5 minutes is probably too long to wait for any normal interaction with a system, so if you use theCompletableFuture.get(timeout, TimeUnit)
method of the returned future, you might want to put a lower timeout there - if the answer hasn't come within that time, you'll get aTimeoutException
. If you instead use the non-param variantget()
, you will get anExecutionException
when the 2.5 minutes have passed (that exception'scause
will be theMatsFuturizerTimeoutException
mentioned above).- Type Parameters:
T
- the type of the reply DTO.- Parameters:
traceId
- TraceId of the resulting Mats call flow, seeMatsInitiator.MatsInitiate.traceId(CharSequence)
from
- the "from" of the initiation, seeMatsInitiator.MatsInitiate.from(String)
to
- to which Mats endpoint the request should go, seeMatsInitiator.MatsInitiate.to(String)
replyClass
- which expected reply DTO class that the requested endpoint replies with.request
- the request DTO that should be sent to the endpoint, seeMatsInitiator.MatsInitiate.request(Object)
- Returns:
- a
CompletableFuture
which will be resolved with aMatsFuturizer.Reply
-instance that contains both some meta-data, and thereply
from the requested endpoint.
-
getOutstandingPromiseCount
public int getOutstandingPromiseCount()- Returns:
- the number of outstanding promises, not yet completed or timed out.
-
getCompleterThreadPool
- Returns:
- the future-completer-thread-pool, for introspection. If you mess with it, you will be sorry..!
-
_newThreadPool
-
_createPromise
-
_enqueuePromise
-
_assertFuturizerRunning
protected void _assertFuturizerRunning() -
_sendRequestToFulfillPromise
protected <T> void _sendRequestToFulfillPromise(String from, String endpointId, String traceId, Object request, MatsInitiator.InitiateLambda extraMessageInit, MatsFuturizer.Promise<T> promise) -
_handleRepliesForPromises
protected void _handleRepliesForPromises(MatsEndpoint.ProcessContext<Void> context, String correlationId, MatsEndpoint.MatsObject matsObject) -
_deserializeReply
-
_uncheckedComplete
protected void _uncheckedComplete(MatsEndpoint.ProcessContext<Void> context, Object replyObject, MatsFuturizer.Promise<?> promise) -
_startTimeouterThread
protected void _startTimeouterThread() -
_timeoutCompleteExceptionally
-
close
public void close()Closes the MatsFuturizer. Notice: Spring will also notice this method if the MatsFuturizer is registered as a@Bean
, and will register it as a destroy method.- Specified by:
close
in interfaceAutoCloseable
-