public class MatsFuturizer
extends java.lang.Object
implements java.lang.AutoCloseable
singletonFuturizer.futurizeNonessential(...)
(or
futurize(...)
for full
configurability), specifying which Mats Endpoint to invoke and the request DTO instance, and then you get a
CompletableFuture
in return. This future will complete once the invoked Mats Endpoint replies.
It is extremely important to understand that this is NOT how you compose multiple Mats Endpoints together! This is
ONLY supposed to be used when you are in a synchronous context (e.g. in a Servlet, or a Spring @RequestMapping), and
want to interact with the Mats fabric of services.
A question you should ask yourself, is how this works in a multi-node setup? For a Mats flow, it does not matter
which node a given stage of a MatsEndpoint is performed, as it is by design totally stateless wrt. the executing
node, as all state resides in the message. However, for a synchronous situation as in a HTTP request, it definitely
matters that the final reply, the one that should complete the returned future, comes in on the same node that issued
the request, as this is where the CompletableFuture instance is, and where the waiting TCP connection is connected!
The trick here is that the final reply is specified to come in on a node-specific Topic, i.e. it literally has
the node name (default being the hostname) as a part of the MatsEndpoint name, and it is a
SubscriptionTerminator
.
Another aspect to understand, is that while Mats "guarantees" that a submitted initiation will flow through the Mats
endpoints, no matter what happens with the processing nodes (unless you employ NonPersistent messaging,
which futurizeNonessential(..) does!), nothing can be guaranteed wrt. the completion of the future: This is
stateful processing. The node where the MatsFuturizer initiation is performed can crash right after the message has
been put on the Mats fabric, and hence the CompletableFuture vanishes along with everything else on that node. The
mats flow is however already in motion, and will be executed - but when the Reply comes in on the node-specific
Topic, there is no longer any corresponding CompletableFuture to complete. This is also why you should not compose
Mats endpoints using this familiar feeling that a CompletableFuture probably gives you: While a multi-stage
MatsEndpoint is asynchronous, resilient and highly available and each stage is transactionally performed, with
retries and all the goodness that comes with a message oriented architecture, once you rely on a CompletableFuture,
you are in a synchronous world where a power outage or a reboot can stop the processing midway. Thus, the
MatsFuturizer should always just be employed out the very outer edge facing the actual client - any other processing
should be performed using MatsEndpoints, and composition of MatsEndpoints should be done using multi-stage
MatsEndpoints.
Note that in the case of pure "GET-style" requests where information is only retrieved and no state in the total
system is changed, everything is a bit more relaxed: If a processing fails, the worst thing that happens is a
slightly annoyed user. But if this was an "add order" or "move money" instruction from the user, a mid-processing
failure is rather bad and could require human intervention to clean up. Thus, the
futurizeNonessential(..)
method should only be employed for such "GET-style" requests, and any other
potentially state changing operations must employ the generic futurize(..)
method.Modifier and Type | Class and Description |
---|---|
static class |
MatsFuturizer.MatsFuturizerTimeoutException
This exception is raised through the
CompletableFuture if the timeout specified when getting the
CompletableFuture is reached (to get yourself a future, use one of the
futurizeXYZ(..) methods). |
protected static class |
MatsFuturizer.Promise<T> |
static class |
MatsFuturizer.Reply<T>
An instance of this class will be the return value of any
CompletableFuture s created with the
MatsFuturizer . |
Modifier and Type | Field and Description |
---|---|
protected java.util.HashMap<java.lang.String,MatsFuturizer.Promise<?>> |
_correlationIdToPromiseMap |
protected java.util.concurrent.ThreadPoolExecutor |
_futureCompleterThreadPool |
protected java.util.concurrent.locks.ReentrantLock |
_internalStateLock |
protected MatsFactory |
_matsFactory |
protected MatsInitiator |
_matsInitiator |
protected int |
_maxOutstandingPromises |
protected MatsFuturizer.Promise<?> |
_nextInLineToTimeout |
protected MatsEndpoint<java.lang.Void,java.lang.String> |
_replyHandlerEndpoint |
protected boolean |
_replyHandlerEndpointStarted |
protected boolean |
_runFlag |
protected java.lang.String |
_terminatorEndpointId |
protected java.util.concurrent.atomic.AtomicInteger |
_threadNumber |
protected java.util.concurrent.locks.Condition |
_timeouterPing_InternalStateLock |
protected java.util.PriorityQueue<MatsFuturizer.Promise<?>> |
_timeoutSortedPromises |
Modifier | Constructor and Description |
---|---|
protected |
MatsFuturizer(MatsFactory matsFactory,
java.lang.String endpointIdPrefix,
int corePoolSize,
int maxPoolSize,
int maxOutstandingPromises) |
Modifier and Type | Method and Description |
---|---|
protected void |
_assertFuturizerRunning() |
protected <T> MatsFuturizer.Promise<T> |
_createPromise(java.lang.String traceId,
java.lang.String from,
java.lang.String to,
java.lang.Class<T> replyClass,
int timeout,
java.util.concurrent.TimeUnit unit) |
protected java.lang.Object |
_deserializeReply(MatsEndpoint.MatsObject matsObject,
java.lang.Class<?> toClass) |
protected <T> void |
_enqueuePromise(MatsFuturizer.Promise<T> promise) |
protected void |
_handleRepliesForPromises(MatsEndpoint.ProcessContext<java.lang.Void> context,
java.lang.String correlationId,
MatsEndpoint.MatsObject matsObject) |
protected java.util.concurrent.ThreadPoolExecutor |
_newThreadPool(int corePoolSize,
int maximumPoolSize) |
protected <T> void |
_sendRequestToFulfillPromise(java.lang.String from,
java.lang.String endpointId,
java.lang.String traceId,
java.lang.Object request,
MatsInitiator.InitiateLambda extraMessageInit,
MatsFuturizer.Promise<T> promise) |
protected void |
_startTimeouterThread() |
protected void |
_timeoutCompleteExceptionally(MatsFuturizer.Promise<?> promise,
java.lang.String msg) |
protected void |
_uncheckedComplete(MatsEndpoint.ProcessContext<java.lang.Void> context,
java.lang.Object replyObject,
MatsFuturizer.Promise<?> promise) |
void |
close()
Closes the MatsFuturizer.
|
static MatsFuturizer |
createMatsFuturizer(MatsFactory matsFactory)
Creates a MatsFuturizer, and you should only need one per MatsFactory (which again mostly means one per
application or micro-service or JVM).
|
static MatsFuturizer |
createMatsFuturizer(MatsFactory matsFactory,
java.lang.String endpointIdPrefix)
Creates a MatsFuturizer, and you should only need one per MatsFactory (which again mostly means one per
application or micro-service or JVM).
|
static MatsFuturizer |
createMatsFuturizer(MatsFactory matsFactory,
java.lang.String endpointIdPrefix,
int corePoolSize,
int maxPoolSize,
int maxOutstandingPromises)
Creates a MatsFuturizer, and you should only need one per MatsFactory (which again mostly means one per
application or micro-service or JVM).
|
<T> java.util.concurrent.CompletableFuture<MatsFuturizer.Reply<T>> |
futurize(java.lang.String traceId,
java.lang.String from,
java.lang.String to,
java.lang.Class<T> replyClass,
java.lang.Object request,
MatsInitiator.InitiateLambda customInit)
Convenience-variant of the generic
futurize(..) form, where
the timeout is set to 2.5 minutes. |
<T> java.util.concurrent.CompletableFuture<MatsFuturizer.Reply<T>> |
futurize(java.lang.String traceId,
java.lang.String from,
java.lang.String to,
int timeout,
java.util.concurrent.TimeUnit unit,
java.lang.Class<T> replyClass,
java.lang.Object request,
MatsInitiator.InitiateLambda customInit)
The generic form of initiating a request-message that returns a
CompletableFuture , which enables you to
tailor all properties. |
<T> java.util.concurrent.CompletableFuture<MatsFuturizer.Reply<T>> |
futurizeNonessential(java.lang.String traceId,
java.lang.String from,
java.lang.String to,
java.lang.Class<T> replyClass,
java.lang.Object request)
NOTICE: This variant must only be used for "GET-style" Requests where none of the endpoints the call
flow passes will add, remove or alter any state of the system, and where it doesn't matter all that much if a
message (and hence the Mats flow) is lost!
The goal of this method is to be able to get hold of e.g.
|
java.util.concurrent.ThreadPoolExecutor |
getCompleterThreadPool() |
int |
getOutstandingPromiseCount() |
protected final MatsFactory _matsFactory
protected final MatsInitiator _matsInitiator
protected final java.lang.String _terminatorEndpointId
protected final java.util.concurrent.ThreadPoolExecutor _futureCompleterThreadPool
protected final int _maxOutstandingPromises
protected final MatsEndpoint<java.lang.Void,java.lang.String> _replyHandlerEndpoint
protected final java.util.concurrent.atomic.AtomicInteger _threadNumber
protected volatile boolean _replyHandlerEndpointStarted
protected final java.util.concurrent.locks.ReentrantLock _internalStateLock
protected final java.util.concurrent.locks.Condition _timeouterPing_InternalStateLock
protected final java.util.HashMap<java.lang.String,MatsFuturizer.Promise<?>> _correlationIdToPromiseMap
protected final java.util.PriorityQueue<MatsFuturizer.Promise<?>> _timeoutSortedPromises
protected MatsFuturizer.Promise<?> _nextInLineToTimeout
protected volatile boolean _runFlag
protected MatsFuturizer(MatsFactory matsFactory, java.lang.String endpointIdPrefix, int corePoolSize, int maxPoolSize, int maxOutstandingPromises)
public static MatsFuturizer createMatsFuturizer(MatsFactory matsFactory)
createMatsFuturizer(MatsFactory, String)
, but with this variant also the
'endpointIdPrefix' is set to what is returned by matsFactory.getFactoryConfig().getAppName()
.
Note that if you - against the above suggestion - create more than one MatsFuturizer for a MatsFactory, then
you MUST give them different endpointIdPrefixes, thus you cannot use this method!matsFactory
- the underlying MatsFactory
on which outgoing messages will be sent, and on which the receiving
SubscriptionTerminator
will be created.MatsFuturizer
, which is tied to a newly created
SubscriptionTerminator
.public static MatsFuturizer createMatsFuturizer(MatsFactory matsFactory, java.lang.String endpointIdPrefix)
matsFactory.getFactoryConfig().getConcurrency()
returns at creation time x
4 for "corePoolSize", but at least 5, (i.e. "min"); and concurrency * 20, but at least 100, for "maximumPoolSize"
(i.e. max). The pool is set up to let non-core threads expire after 5 minutes. The maximum number of outstanding
promises is set to 50k.matsFactory
- the underlying MatsFactory
on which outgoing messages will be sent, and on which the receiving
SubscriptionTerminator
will be created.endpointIdPrefix
- the first part of the endpointId, which typically should be some "class-like" construct denoting the
service name, like "OrderService" or "InventoryService", preferably the same prefix you use for all
your other endpoints running on this same service. Note: If you create multiple MatsFuturizers for
a MatsFactory, this parameter must be different for each instance!MatsFuturizer
, which is tied to a newly created
SubscriptionTerminator
.public static MatsFuturizer createMatsFuturizer(MatsFactory matsFactory, java.lang.String endpointIdPrefix, int corePoolSize, int maxPoolSize, int maxOutstandingPromises)
Integer.MAX_VALUE
.matsFactory
- the underlying MatsFactory
on which outgoing messages will be sent, and on which the receiving
SubscriptionTerminator
will be created.endpointIdPrefix
- the first part of the endpointId, which typically should be some "class-like" construct denoting the
service name, like "OrderService" or "InventoryService", preferably the same prefix you use for all
your other endpoints running on this same service. Note: If you create multiple MatsFuturizers for
a MatsFactory, this parameter must be different for each instance!corePoolSize
- the minimum number of threads in the future-completer-pool of threads.maxPoolSize
- the maximum number of threads in the future-completer-pool of threads.maxOutstandingPromises
- the maximum number of outstanding Promises before new are rejected. Should be a fairly high number,
e.g. the default of createMatsFuturizer(MatsFactory, String)
is 50k.MatsFuturizer
, which is tied to a newly created
SubscriptionTerminator
.public <T> java.util.concurrent.CompletableFuture<MatsFuturizer.Reply<T>> futurize(java.lang.String traceId, java.lang.String from, java.lang.String to, int timeout, java.util.concurrent.TimeUnit unit, java.lang.Class<T> replyClass, java.lang.Object request, MatsInitiator.InitiateLambda customInit)
CompletableFuture
, which enables you to
tailor all properties. To set interactive-, nonPersistent- or noAudit-flags, or to tack on any
"sideloads"
to the outgoing message, use the "customInit"
parameter, which directly is the InitiateLambda
that the MatsFuturizer initiation is
using.
For a bit more explanation, please read JavaDoc of
futurizeInteractiveUnreliable(..)
T
- the type of the reply DTO.traceId
- TraceId of the resulting Mats call flow, see MatsInitiator.MatsInitiate.traceId(CharSequence)
from
- the "from" of the initiation, see MatsInitiator.MatsInitiate.from(String)
to
- to which Mats endpoint the request should go, see MatsInitiator.MatsInitiate.to(String)
timeout
- how long before the internal timeout-mechanism of MatsFuturizer kicks in and the future is
completed exceptionally
with a
MatsFuturizer.MatsFuturizerTimeoutException
.unit
- the unit of time of the 'timeout' parameter.replyClass
- which expected reply DTO class that the requested endpoint replies with.request
- the request DTO that should be sent to the endpoint, see MatsInitiator.MatsInitiate.request(Object)
customInit
- the MatsInitiator.InitiateLambda
that the MatsFuturizer is employing to initiate the outgoing message, which
you can use to tailor the message, e.g. setting the interactive
-flag or tacking on "sideloads"
.CompletableFuture
which will be resolved with a MatsFuturizer.Reply
-instance that contains both some
meta-data, and the reply
from the requested endpoint.public <T> java.util.concurrent.CompletableFuture<MatsFuturizer.Reply<T>> futurize(java.lang.String traceId, java.lang.String from, java.lang.String to, java.lang.Class<T> replyClass, java.lang.Object request, MatsInitiator.InitiateLambda customInit)
futurize(..)
form, where
the timeout is set to 2.5 minutes. To set interactive-, nonPersistent- or noAudit-flags, or to tack on any
"sideloads"
to the outgoing message, use the "customInit"
parameter, which directly is the InitiateLambda
that the MatsFuturizer initiation is
using.
For a bit more explanation, please read JavaDoc of
futurizeInteractiveUnreliable(..)
T
- the type of the reply DTO.traceId
- TraceId of the resulting Mats call flow, see MatsInitiator.MatsInitiate.traceId(CharSequence)
from
- the "from" of the initiation, see MatsInitiator.MatsInitiate.from(String)
to
- to which Mats endpoint the request should go, see MatsInitiator.MatsInitiate.to(String)
the unit of time of
the 'timeout' parameter.replyClass
- which expected reply DTO class that the requested endpoint replies with.request
- the request DTO that should be sent to the endpoint, see MatsInitiator.MatsInitiate.request(Object)
customInit
- the MatsInitiator.InitiateLambda
that the MatsFuturizer is employing to initiate the outgoing message, which
you can use to tailor the message, e.g. setting the interactive
-flag or tacking on "sideloads"
.CompletableFuture
which will be resolved with a MatsFuturizer.Reply
-instance that contains both some
meta-data, and the reply
from the requested endpoint.public <T> java.util.concurrent.CompletableFuture<MatsFuturizer.Reply<T>> futurizeNonessential(java.lang.String traceId, java.lang.String from, java.lang.String to, java.lang.Class<T> replyClass, java.lang.Object request)
non-persistent
(unreliable),
interactive
(prioritized), non-audited
(request and reply DTOs won't be archived) Request-message to the specified endpoint, returning
a CompletableFuture
that will be completed
when the Reply from
the requested endpoint comes back. The internal MatsFuturizer timeout will be set to 2.5 minutes, meaning
that if there is no reply forthcoming within that time, the CompletableFuture
will be
completed exceptionally
with a
MatsFuturizerTimeoutException
, and the Promise deleted from the futurizer.
2.5 minutes is probably too long to wait for any normal interaction with a system, so if you use the
CompletableFuture.get(timeout, TimeUnit)
method of the returned
future, you might want to put a lower timeout there - if the answer hasn't come within that time, you'll get a
TimeoutException
. If you instead use the non-param variant get()
, you
will get an ExecutionException
when the 2.5 minutes have passed (that exception's
cause
will be the MatsFuturizerTimeoutException
mentioned above).T
- the type of the reply DTO.traceId
- TraceId of the resulting Mats call flow, see MatsInitiator.MatsInitiate.traceId(CharSequence)
from
- the "from" of the initiation, see MatsInitiator.MatsInitiate.from(String)
to
- to which Mats endpoint the request should go, see MatsInitiator.MatsInitiate.to(String)
replyClass
- which expected reply DTO class that the requested endpoint replies with.request
- the request DTO that should be sent to the endpoint, see MatsInitiator.MatsInitiate.request(Object)
CompletableFuture
which will be resolved with a MatsFuturizer.Reply
-instance that contains both some
meta-data, and the reply
from the requested endpoint.public int getOutstandingPromiseCount()
public java.util.concurrent.ThreadPoolExecutor getCompleterThreadPool()
protected java.util.concurrent.ThreadPoolExecutor _newThreadPool(int corePoolSize, int maximumPoolSize)
protected <T> MatsFuturizer.Promise<T> _createPromise(java.lang.String traceId, java.lang.String from, java.lang.String to, java.lang.Class<T> replyClass, int timeout, java.util.concurrent.TimeUnit unit)
protected <T> void _enqueuePromise(MatsFuturizer.Promise<T> promise)
protected void _assertFuturizerRunning()
protected <T> void _sendRequestToFulfillPromise(java.lang.String from, java.lang.String endpointId, java.lang.String traceId, java.lang.Object request, MatsInitiator.InitiateLambda extraMessageInit, MatsFuturizer.Promise<T> promise)
protected void _handleRepliesForPromises(MatsEndpoint.ProcessContext<java.lang.Void> context, java.lang.String correlationId, MatsEndpoint.MatsObject matsObject)
protected java.lang.Object _deserializeReply(MatsEndpoint.MatsObject matsObject, java.lang.Class<?> toClass)
protected void _uncheckedComplete(MatsEndpoint.ProcessContext<java.lang.Void> context, java.lang.Object replyObject, MatsFuturizer.Promise<?> promise)
protected void _startTimeouterThread()
protected void _timeoutCompleteExceptionally(MatsFuturizer.Promise<?> promise, java.lang.String msg)
public void close()
@Bean
, and will register it as a destroy method.close
in interface java.lang.AutoCloseable