Class BlockingMessenger
package: src.net.jxta.impl.endpoint
net.jxta.impl.endpoint.BlockingMessenger
public abstract class BlockingMessenger
Extends:
AbstractMessenger
This class is a near-drop-in replacement for the previous BlockingMessenger class. To subclassers (that is, currently, transports) the only difference is that some overloaded methods have a different name (class hierarchy reasons made it impossible to preserve the names without forcing an API change for applications). The other difference which is not API visible, is that it implements the standard MessengerState behaviour and semantics required by the changes in the endpoint framework. This the only base messenger class meant to be extended by outside code that is in the impl tree. The reason being that what it replaces was there already and that new code should not become dependant upon it.
Inner Class Summary
private final static class
   The implementation of channel messenger that getChannelMessenger returns: All it does is address rewritting.
 
private final static class
   Our statemachine implementation; just connects the standard AbstractMessengerState action methods to this object.
 
Field Summary
private final static int
   Must report failure to connect.
 
private final static int
   No action deferred.
 
private final static int
   Must send the current message.
 
private Message
   The outstanding message.
 
private String
   The serviceParam override for that message.
 
private String
   The serviceName override for that message.
 
private Throwable
   The exception that caused that message to not be sent.
 
private int
   The current deferred action.
 
private final PeerGroupID
   Need to know which group this transport lives in, so that we can suppress channel redirection when in the same group.
 
private boolean
   true if we have deliberately closed our one message input queue.
 
private boolean
   legacy artefact: transports need to believe the messenger is not yet closed in order to actually close it.
 
private final static Logger
   Log4J Logger
 
private Object
   Reference to owning object.
 
private TimerTask
   The timer task watching over our self destruction requirement.
 
   State lock and engine.
 
private static Timer
   The self destruct timer.
 
Constructor Summary
public
BlockingMessenger(PeerGroupID homeGroupID, EndpointAddress dest, boolean selfDestruct)
   Constructor.
 
public
BlockingMessengerChannel(EndpointAddress baseAddress, PeerGroupID redirection, String origService, String origServiceParam)
protected
Method Summary
public void
public EndpointAddress
public int
public void
itemChanged(Object changedObject)
protected void
registerListener(SimpleSelectable l)
   Always make sure we're registered with the shared messenger
 
public void
public void
sendMessageB(Message msg, String service, String serviceParam)
public boolean
sendMessageN(Message msg, String service, String serviceParam)
protected void
protected void
protected void
protected void
protected void
private void
   Performs the ACTION_CONNECT deferred action: generate a downEvent since we cannot reconnect.
 
public final void
   {@inheritDoc}

Some transports historically overload the close method of BlockingMessenger.

 
protected abstract void
   Close connection.
 
private int
   A shortHand for a frequently used sequence.
 
public final Messenger
getChannelMessenger(PeerGroupID redirection, String service, String serviceParam)
protected EndpointAddress
getDestAddressToUse(String service, String serviceParam)
   A trivial convenience method that transports still depend upon.
 
public final EndpointAddress
   {@inheritDoc}

getLogicalDestinationAddress() requires resolution (it's the address advertised by the other end).

 
protected abstract EndpointAddress
   Obtain the logical destination address from the implementer (a transport for example).
 
public final int
public boolean
   We overload isClosed because many messengers still invoke super.isClosed() for their own implementation and they expect it to be true only when all is shutdown; not while we're closing gently.
 
protected abstract boolean
   return true if this messenger has not been used for a long time.
 
private void
   Three exposed methods may need to inject new events in the system: sendMessageN, close, and shutdown.
 
public final void
private void
   Performs the ACTION_SEND deferred action: sends the one msg in our one msg queue.
 
public void
sendMessageB(Message msg, String service, String serviceParam)
   {@inheritDoc}
 
protected abstract boolean
sendMessageBImpl(Message message, String service, String param)
   send message.
 
public final boolean
sendMessageN(Message msg, String service, String serviceParam)
public void
setOwner(Object owner)
   Sets an owner for this blocking messenger.
 
protected final void
   A transport may call this to cause an orderly closure of its messengers.
 
private void
storeCurrent(Message msg, String service, String param)
Field Detail
ACTION_CONNECT
private final static int ACTION_CONNECT = 2
Must report failure to connect.

ACTION_NONE
private final static int ACTION_NONE = 0
No action deferred.

ACTION_SEND
private final static int ACTION_SEND = 1
Must send the current message.

currentMessage
private Message currentMessage = null
The outstanding message.

currentParam
private String currentParam = null
The serviceParam override for that message.

currentService
private String currentService = null
The serviceName override for that message.

currentThrowable
private Throwable currentThrowable = null
The exception that caused that message to not be sent.

deferredAction
private int deferredAction = ACTION_NONE
The current deferred action.

homeGroupID
private final PeerGroupID homeGroupID
Need to know which group this transport lives in, so that we can suppress channel redirection when in the same group. This is currently the norm.

inputClosed
private boolean inputClosed = false
true if we have deliberately closed our one message input queue.

lieToOldTransports
private boolean lieToOldTransports = false
legacy artefact: transports need to believe the messenger is not yet closed in order to actually close it. So we lie to them just while we run their closeImpl method so that they do not see that the messenger is officially closed.

LOG
private final static Logger LOG = Logger.getLogger(BlockingMessenger.class.getName())
Log4J Logger

owner
private Object owner = null
Reference to owning object. This is there so that the owning object is not subject to garbage collection unless this object here becomes itself unreferenced. That happens when the self destruct timer closed it.

selfDestructTask
private TimerTask selfDestructTask = null
The timer task watching over our self destruction requirement.

stateMachine
private BlockingMessenger.BlockingMessengerState stateMachine = new BlockingMessengerState()
State lock and engine.

timer
private static Timer timer
The self destruct timer. When this messenger has become idle, it is closed. As a side effect, it makes the owning canonical messenger, if any, subject to removal if it is otherwise unreferenced.
Constructor Detail
BlockingMessenger
public BlockingMessenger(PeerGroupID homeGroupID, EndpointAddress dest, boolean selfDestruct)
Constructor. We start in the CONNECTED state, we pretend to have a queue of size 1, and we can never re-connect. Although this messenger fully respects the asynchronous semantics, it is saturated as soon as one msg is being send, and if not saturated, send is actually performed by the invoker thread. So return is not completely immediate. This is a barely acceptable implementation, but this is also a transition adapter that is bound to disappear one release from now. The main goal is to get things going until transports are adapted.
Parameters:
homeGroupID the group that this messenger works for. This is the group of the endpoint service or transport that created this messenger.
dest where messages should be addressed to
selfDestruct true if this messenger must self close destruct when idle. Warning: If selfDestruct is used, this messenger will remained referenced for as long as isIdleImpl returns false.
Method Detail
cantConnect
private void cantConnect()
Performs the ACTION_CONNECT deferred action: generate a downEvent since we cannot reconnect.

close
public final void close()
{@inheritDoc}

Some transports historically overload the close method of BlockingMessenger. The final is there to make sure we know about it. However, there should be no harm done if the unchanged code is renamed to closeImpl; even if it calls super.close(). The real problem, however, is transports calling close (was their own, but now it means this one), when they want to break. It will make things look like someone just called close, but it will not actually break anything. However, that will cause the state machine to go through the close process. this will end up calling closeImpl(). That will do.


closeImpl
protected abstract void closeImpl()
Close connection. May fail current send.

eventCalled
private int eventCalled()
A shortHand for a frequently used sequence. MUST be called while synchronized on stateMachine.
Return:
the deferred action.

getChannelMessenger
public final Messenger getChannelMessenger(PeerGroupID redirection, String service, String serviceParam)

getDestAddressToUse
protected EndpointAddress getDestAddressToUse(String service, String serviceParam)
A trivial convenience method that transports still depend upon. The reason it exists is that it used to be non-trivial, when the group redirection would sometimes be done at this point (the transports could be asked to send to the non-mangled service and param, when the application used the implicit defaults). This is no-longer true: the transport (the blocking messenger) is always invoked with fully defaulted and mangled service name and param. So all we have to do is to paste them all together. Eventually blocking messenger could simply be invoked with an already computed full destination.

getLogicalDestinationAddress
public final EndpointAddress getLogicalDestinationAddress()
{@inheritDoc}

getLogicalDestinationAddress() requires resolution (it's the address advertised by the other end). For a blocking messenger it's easy. We're born resolved. So, just ask the implementor what it is.


getLogicalDestinationImpl
protected abstract EndpointAddress getLogicalDestinationImpl()
Obtain the logical destination address from the implementer (a transport for example).

getState
public final int getState()

isClosed
public boolean isClosed()
We overload isClosed because many messengers still invoke super.isClosed() for their own implementation and they expect it to be true only when all is shutdown; not while we're closing gently. FIXME - jice@jxta.org 20040413: transports should get a deeper retrofit eventually.

isIdleImpl
protected abstract boolean isIdleImpl()
return true if this messenger has not been used for a long time. The definition of long time is: "so long that closing it is worth the risk of having to re-open". A messenger should self close if it thinks it meets the definition of idle. BlockingMessenger leaves the evaluation to the transport but does the work automatically. Important: if self destruction is used, this method must work; not just return false. See the constructor. In general, if closeImpl does not need to do anyhing, then self destruction is not needed.

performDeferredAction
private void performDeferredAction(int action)
Three exposed methods may need to inject new events in the system: sendMessageN, close, and shutdown. Since they can both cause actions, and since connectAction and startAction are deferred, it seems possible that one of the actions caused by send, close, or shutdown be called while connectAction or startAction are in progress. However, the state machine gives us a few guarantees: connectAction and startAction can never nest. We will not be asked to perform one while still performing the other. Only the synchronous actions closeInput, closeOutput, or failAll can possibly be requested in the interval. We could make more assumptions and simplify the code, but rather keep at least some flexibility.

resolve
public final void resolve()

sendIt
private void sendIt()
Performs the ACTION_SEND deferred action: sends the one msg in our one msg queue. This method *never* sets the outcome message property. This is left to sendMessageN and sendMessageB, because sendMessageB does not want to set it in any other case than success, while sendMessageN does it in all cases. The problem with that is: how do we communicate the outcome to sendMessage{NB} without having to keep the 1 msg queue locked until then (which would be in contradiction with how we interract with the state machine). To make it realy inexpenssive, here's the trick: when a message fails currentMessage and currentFailure remain. So the sendMessageRoutine can check them and known that it is its message and not another one that caused the failure. If all is well, currentMessage and currentFailure are nulled and if another message is send immediately sendMessage is able to see that its own message was processed fully. (this is a small cheat regarding the state of saturation after failall, but that's not actually detectable from the outside: input is closed before failall anyway. See failall for that part.

sendMessageB
public void sendMessageB(Message msg, String service, String serviceParam)
{@inheritDoc}
Throws:
IOException

sendMessageBImpl
protected abstract boolean sendMessageBImpl(Message message, String service, String param)
send message. block as needed. throw IOException or runtime exception as needed. The boolean return value is for historical reasons: so that transports just need to rename their sendMessage() methods. No need to change a bit of the code.
Throws:
IOException

sendMessageN
public final boolean sendMessageN(Message msg, String service, String serviceParam)

setOwner
public void setOwner(Object owner)
Sets an owner for this blocking messenger. Owners are normally canonical messengers. The goal of registering the owner is to keep that owner reachable as long as this blocking messenger is. Canonical messengers are otherwise softly referenced, and so, may be deleted whenever memory is tight. We do not want to use finalizers or the equivalent reference queue mechanism; so we have no idea when a blocking messenger is no-longer referenced by any canonical. In addition it may be expensive to make and so we want to keep it for a while anyway. As a result, instead of keeping a blocking messenger open as long as there is a canonical, we do the opposite: we keep the canonical (owner, here) as long as the blocking messenger is open (and usually beyond, memory allowing). How long a blocking messenger will stay around depends upon that messenger's implementation. That may even be left up to the GC, in the end (if close is not needed AND the messenger is cheap to make). In that case, the owner is likely the only referer, and so both will have the same lifetime.
Parameters:
owner The object that should be kept referenced at least as long as this one.

shutdown
protected final void shutdown()
A transport may call this to cause an orderly closure of its messengers.

storeCurrent
private void storeCurrent(Message msg, String service, String param)