public abstract class BlockingMessenger
Extends:
AbstractMessenger
This class is a near-drop-in replacement for the previous BlockingMessenger class.
To subclassers (that is, currently, transports) the only difference is that some
overloaded methods have a different name (class hierarchy reasons made it impossible
to preserve the names without forcing an API change for applications).
The other difference which is not API visible, is that it implements the
standard MessengerState behaviour and semantics required by the changes in the endpoint framework.
This the only base messenger class meant to be extended by outside code that is in the impl tree. The
reason being that what it replaces was there already and that new code should not become dependant upon it.
| Inner Class Summary |
private final static class |
The implementation of channel messenger that getChannelMessenger returns:
All it does is address rewritting. |
private final static class |
Our statemachine implementation; just connects the standard AbstractMessengerState action methods to
this object. |
| Field Summary |
private final static int |
Must report failure to connect. |
private final static int |
No action deferred. |
private final static int |
Must send the current message. |
private Message |
The outstanding message. |
private String |
The serviceParam override for that message. |
private String |
The serviceName override for that message. |
private Throwable |
The exception that caused that message to not be sent. |
private int |
The current deferred action. |
private final PeerGroupID |
Need to know which group this transport lives in, so that we can suppress
channel redirection when in the same group. |
private boolean |
true if we have deliberately closed our one message input queue. |
private boolean |
legacy artefact: transports need to believe the messenger is not yet closed in order to actually close it. |
private final static Logger |
Log4J Logger |
private Object |
Reference to owning object. |
private TimerTask |
The timer task watching over our self destruction requirement. |
|
State lock and engine. |
private static Timer |
The self destruct timer. |
| Constructor Summary |
public |
BlockingMessenger(PeerGroupID homeGroupID, EndpointAddress dest, boolean selfDestruct) Constructor. |
public |
BlockingMessengerChannel(EndpointAddress baseAddress, PeerGroupID redirection, String origService, String origServiceParam) |
protected |
|
| Method Summary |
public void |
|
public EndpointAddress |
|
public int |
|
public void |
|
protected void |
Always make sure we're registered with the shared messenger |
public void |
|
public void |
sendMessageB(Message msg, String service, String serviceParam) |
public boolean |
sendMessageN(Message msg, String service, String serviceParam) |
protected void |
|
protected void |
|
protected void |
|
protected void |
|
protected void |
|
private void |
Performs the ACTION_CONNECT deferred action: generate a downEvent since we cannot reconnect. |
public final void |
{@inheritDoc}
Some transports historically overload the close method of BlockingMessenger. |
protected abstract void |
Close connection. |
private int |
A shortHand for a frequently used sequence. |
public final Messenger |
|
protected EndpointAddress |
A trivial convenience method that transports still depend upon. |
public final EndpointAddress |
{@inheritDoc}
getLogicalDestinationAddress() requires resolution (it's the address advertised by the other end). |
protected abstract EndpointAddress |
Obtain the logical destination address from the implementer (a transport for example). |
public final int |
|
public boolean |
We overload isClosed because many messengers still invoke super.isClosed() for their own implementation
and they expect it to be true only when all is shutdown; not while we're closing gently. |
protected abstract boolean |
return true if this messenger has not been used for a long time. |
private void |
Three exposed methods may need to inject new events in the system: sendMessageN, close, and shutdown. |
public final void |
|
private void |
Performs the ACTION_SEND deferred action: sends the one msg in our one msg queue. |
public void |
sendMessageB(Message msg, String service, String serviceParam) {@inheritDoc} |
protected abstract boolean |
send message. |
public final boolean |
sendMessageN(Message msg, String service, String serviceParam) |
public void |
Sets an owner for this blocking messenger. |
protected final void |
A transport may call this to cause an orderly closure of its messengers. |
private void |
|
private final static int ACTION_CONNECT = 2
Must report failure to connect.
private final static int ACTION_NONE = 0
No action deferred.
private final static int ACTION_SEND = 1
Must send the current message.
private Message currentMessage = null
The outstanding message.
private String currentParam = null
The serviceParam override for that message.
private String currentService = null
The serviceName override for that message.
private Throwable currentThrowable = null
The exception that caused that message to not be sent.
private int deferredAction = ACTION_NONE
The current deferred action.
private final PeerGroupID homeGroupID
Need to know which group this transport lives in, so that we can suppress
channel redirection when in the same group. This is currently the norm.
private boolean inputClosed = false
true if we have deliberately closed our one message input queue.
private boolean lieToOldTransports = false
legacy artefact: transports need to believe the messenger is not yet closed in order to actually close it.
So we lie to them just while we run their closeImpl method so that they do not see that the messenger is
officially closed.
private final static Logger LOG = Logger.getLogger(BlockingMessenger.class.getName())
Log4J Logger
private Object owner = null
Reference to owning object. This is there so that the owning object is not subject to garbage collection
unless this object here becomes itself unreferenced. That happens when the self destruct timer closed it.
private TimerTask selfDestructTask = null
The timer task watching over our self destruction requirement.
State lock and engine.
private static Timer timer
The self destruct timer.
When this messenger has become idle, it is closed. As a side effect, it makes the owning canonical messenger,
if any, subject to removal if it is otherwise unreferenced.
public BlockingMessenger(PeerGroupID homeGroupID, EndpointAddress dest, boolean selfDestruct)
Constructor.
We start in the CONNECTED state, we pretend to have a queue of size 1, and we can never re-connect. Although this
messenger fully respects the asynchronous semantics, it is saturated as soon as one msg is being send, and if not
saturated, send is actually performed by the invoker thread. So return is not completely immediate. This is a barely
acceptable implementation, but this is also a transition adapter that is bound to disappear one release from now. The main
goal is to get things going until transports are adapted.
Parameters:
homeGroupID the group that this messenger works for. This is the group of the endpoint service or transport
that created this messenger.
dest where messages should be addressed to
selfDestruct true if this messenger must self close destruct when idle. Warning: If selfDestruct is used,
this messenger will remained referenced for as long as isIdleImpl returns false.
private void cantConnect()
Performs the ACTION_CONNECT deferred action: generate a downEvent since we cannot reconnect.
public final void close()
{@inheritDoc}
Some transports historically overload the close method of BlockingMessenger.
The final is there to make sure we know about it. However, there should be no
harm done if the unchanged code is renamed to closeImpl; even if it calls super.close().
The real problem, however, is transports calling close (was their own, but now it means this one), when
they want to break. It will make things look like someone just called close, but it will not
actually break anything. However, that will cause the state machine to go through the close process.
this will end up calling closeImpl(). That will do.
protected abstract void closeImpl()
Close connection. May fail current send.
private int eventCalled()
A shortHand for a frequently used sequence. MUST be called while synchronized on stateMachine.
Return:
the deferred action.
public final Messenger getChannelMessenger(PeerGroupID redirection, String service, String serviceParam)
protected EndpointAddress getDestAddressToUse(String service, String serviceParam)
A trivial convenience method that transports still depend upon.
The reason it exists is that it used to be non-trivial, when
the group redirection would sometimes be done at this point (the
transports could be asked to send to the non-mangled service and
param, when the application used the implicit defaults). This is
no-longer true: the transport (the blocking messenger) is always
invoked with fully defaulted and mangled service name and param. So
all we have to do is to paste them all together. Eventually blocking
messenger could simply be invoked with an already computed
full destination.
getLogicalDestinationAddress
public final EndpointAddress getLogicalDestinationAddress()
{@inheritDoc}
getLogicalDestinationAddress() requires resolution (it's the address advertised by the other end).
For a blocking messenger it's easy. We're born resolved. So, just ask the implementor what it is.
getLogicalDestinationImpl
protected abstract EndpointAddress getLogicalDestinationImpl()
Obtain the logical destination address from the implementer (a transport for example).
public final int getState()
public boolean isClosed()
We overload isClosed because many messengers still invoke super.isClosed() for their own implementation
and they expect it to be true only when all is shutdown; not while we're closing gently.
FIXME - jice@jxta.org 20040413: transports should get a deeper retrofit eventually.
protected abstract boolean isIdleImpl()
return true if this messenger has not been used for a long time. The definition of long time is: "so long that closing it
is worth the risk of having to re-open". A messenger should self close if it thinks it meets the definition of
idle. BlockingMessenger leaves the evaluation to the transport but does the work automatically. Important: if
self destruction is used, this method must work; not just return false. See the constructor. In general, if closeImpl does
not need to do anyhing, then self destruction is not needed.
private void performDeferredAction(int action)
Three exposed methods may need to inject new events in the system: sendMessageN, close, and shutdown. Since they can both
cause actions, and since connectAction and startAction are deferred, it seems possible that one of the
actions caused by send, close, or shutdown be called while connectAction or startAction are in progress.
However, the state machine gives us a few guarantees: connectAction and startAction can never nest. We will not be
asked to perform one while still performing the other. Only the synchronous actions closeInput, closeOutput, or
failAll can possibly be requested in the interval. We could make more assumptions and simplify the code, but rather
keep at least some flexibility.
public final void resolve()
private void sendIt()
Performs the ACTION_SEND deferred action: sends the one msg in our one msg queue.
This method *never* sets the outcome message property. This is left to sendMessageN and sendMessageB, because
sendMessageB does not want to set it in any other case than success, while sendMessageN does it in all cases.
The problem with that is: how do we communicate the outcome to sendMessage{NB} without having to keep
the 1 msg queue locked until then (which would be in contradiction with how we interract with the state machine).
To make it realy inexpenssive, here's the trick: when a message fails currentMessage and currentFailure remain.
So the sendMessageRoutine can check them and known that it is its message and not another one that caused the
failure. If all is well, currentMessage and currentFailure are nulled and if another message is send immediately
sendMessage is able to see that its own message was processed fully. (this is a small cheat regarding the
state of saturation after failall, but that's not actually detectable from the outside: input is closed
before failall anyway. See failall for that part.
public void sendMessageB(Message msg, String service, String serviceParam)
{@inheritDoc}
Throws:
IOException
protected abstract boolean sendMessageBImpl(Message message, String service, String param)
send message. block as needed. throw IOException or runtime exception as needed.
The boolean return value is for historical reasons: so that transports just need to rename their sendMessage() methods.
No need to change a bit of the code.
Throws:
IOException
public final boolean sendMessageN(Message msg, String service, String serviceParam)
public void setOwner(Object owner)
Sets an owner for this blocking messenger. Owners are normally canonical messengers. The goal of registering the owner is
to keep that owner reachable as long as this blocking messenger is. Canonical messengers are otherwise softly referenced,
and so, may be deleted whenever memory is tight.
We do not want to use finalizers or the equivalent reference queue mechanism; so we have no idea when a blocking messenger
is no-longer referenced by any canonical. In addition it may be expensive to make and so we want to keep it for a while
anyway. As a result, instead of keeping a blocking messenger open as long as there is a canonical, we do the opposite: we
keep the canonical (owner, here) as long as the blocking messenger is open (and usually beyond, memory allowing). How long
a blocking messenger will stay around depends upon that messenger's implementation. That may even be left up to the GC, in
the end (if close is not needed AND the messenger is cheap to make). In that case, the owner is likely the only referer,
and so both will have the same lifetime.
Parameters:
owner The object that should be kept referenced at least as long as this one.
protected final void shutdown()
A transport may call this to cause an orderly closure of its messengers.
private void storeCurrent(Message msg, String service, String param)