Thanks so much for the response Jeremy. I hacked together something that seems to work. A slightly different approach then yours. I didn't touch the AsyncHttpRequestHandlingMessageAdapter, but instead changed the broadcast method in HttpMessageBroadcaster. Mine only works for authenticated users through Spring Security and it only works for messages that implement my UserMessage bean, which has a userId field. I could easily combine your broadcast with mine so it can handle all cases, but for now this is the only case I needed.
Code:
protected void broadcast(AtmosphereResource<?, ?> resource, AtmosphereResourceEvent event) {
HttpServletRequest request = (HttpServletRequest) resource.getRequest();
HttpServletResponse response = (HttpServletResponse) resource.getResponse();
SecurityContext securityContext = (SecurityContext)request.getSession().getAttribute(HttpSessionSecurityContextRepository.SPRING_SECURITY_CONTEXT_KEY);
Integer userId = null;
if (securityContext != null && securityContext.getAuthentication() != null && securityContext.getAuthentication().getPrincipal() instanceof User) {
userId = ((User)securityContext.getAuthentication().getPrincipal()).getUserId();
}
else {
throw new MessagingException("No authenticated user for user broadcast!");
}
List<HttpBroadcastMessage> messages = new ArrayList<HttpBroadcastMessage>();
if (event.getMessage() instanceof List) {
List messageBacklog = (List) event.getMessage();
if (!messageBacklog.isEmpty() && messageBacklog.get(0) instanceof HttpBroadcastMessage) {
for (Object message : messageBacklog) {
if (message instanceof HttpBroadcastMessage && ((HttpBroadcastMessage) message).getMessage().getPayload() instanceof UserMessage && ((UserMessage) ((HttpBroadcastMessage) message).getMessage().getPayload()).getUserId() == userId) {
HttpBroadcastMessage httpBroadcastMessage = (HttpBroadcastMessage)message;
messages.add(httpBroadcastMessage);
}
}
}
}
else if (HttpBroadcastMessage.class.isAssignableFrom(event.getMessage().getClass()) && ((HttpBroadcastMessage) event.getMessage()).getMessage().getPayload() instanceof UserMessage && ((UserMessage) ((HttpBroadcastMessage) event.getMessage()).getMessage().getPayload()).getUserId() == userId) {
messages.add((HttpBroadcastMessage)event.getMessage());
}
else {
throw new MessagingException("Illegal message for user broadcast!");
}
if (messages.isEmpty()) {
log.info("User "+userId+" has no messages.");
return;
}
Message<?> broadcastMessage = mergeMessagesForBroadcast(messages);
// The rest of the code after this point is the same as Jeremy Grelle's version.
try {
response.getOutputStream();
}
catch (Exception ex) {
throw new MessagingException("Cannot get the Servlet OutputStream for delivering async Message to browser client. " + "Ensure that you've set 'useStreamForFlushingComments' to true in the AtmosphereServlet.", ex);
}
HttpMessageBroadcasterResponseWrapper responseWrapper = new HttpMessageBroadcasterResponseWrapper(response);
messageMapper.writeMessage(request, responseWrapper, broadcastMessage, true, headerMapper);
try {
response.getOutputStream().write(responseWrapper.toByteArray());
if (log.isInfoEnabled()) {
log.info("Wrote " + responseWrapper.toByteArray().length + " bytes to response.");
}
response.getOutputStream().flush();
}
catch (IOException ex) {
throw new MessagingException("Failed to write async Message to browser client.", ex);
}
Boolean resumeOnBroadcast = (Boolean) request.getAttribute(AtmosphereServlet.RESUME_ON_BROADCAST);
if (resumeOnBroadcast != null && resumeOnBroadcast) {
resource.resume();
}
}
Not sure if my way of changing the broadcaster is a good way of doing it. I think I'll explore your suggestions a bit more. The one thing that's annoying is the BraodcasterCache isn't working. I see your comments at the top saying it's not working and you have a message queue and threshold as temp solution. I'm trying to figure out why the cache isn't working in your code. Any suggestions on where I should look?