and in LS server i receive:
.....
15.Sep.09 15:04:07,761 < INFO> lastHeartbeatRandom is 3555
15.Sep.09 15:04:07,761 < INFO> Received heartbeat: 3555
15.Sep.09 15:04:07,761 < INFO> ------------------>Write UNLOCK 6
15.Sep.09 15:04:07,772 < INFO> ------------------>Write LOCK 7
15.Sep.09 15:04:07,773 < INFO> ------------------>Write UNLOCK 7
15.Sep.09 15:04:08,761 < INFO> Received message
15.Sep.09 15:04:08,762 < INFO> ------------------>Write LOCK 6
15.Sep.09 15:04:08,762 < INFO> lastHeartbeatRandom is 3555
......
But since i have two adapters (two different instances of the classes that extends my baseJMSadapter) i do not see why this is happeinng. The strange thing is that
15.Sep.09 15:04:04,170 < INFO> Inactive flag dispatched: manualbet (1) - lastHeartBeatRandom = -1 ?!?! And for that reason no jms message is send to the feeder, and that is happening only for ManualBetJMSAdapter, not for the other. Also if i remove the TickJMSAdapter from the adapters.xml ManualBetJMSAdapter is working properly.
Here is my LightStreamerEngine (like you call it Generator.java)
Code:
package com.trinitas.ls.feed;
import java.util.HashMap;
import java.util.Properties;
import java.util.Random;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;
import javax.naming.NamingException;
import org.apache.log4j.Logger;
import org.apache.log4j.xml.DOMConfigurator;
import com.trinitas.common.jms.ConnectionLoop;
import com.trinitas.common.jms.ExtendedMessageListener;
import com.trinitas.common.jms.FeedMessage;
import com.trinitas.common.jms.HeartbeatMessage;
import com.trinitas.common.jms.JMSHandler;
import com.trinitas.common.jms.SubscribedItemAttributes;
public class LightstreamerEngine extends Thread implements ExtendedMessageListener, ExternalFeedListener {
private Random randomGen = new Random();
@Override
public void run() {
logger.info("generator is starting. Loading configuration...");
myFeed.start();
// start the loop that tries to connect to JMS
new ConnectionLoopTPQR(jmsHandler, recoveryPause, logger).start();
final HeartbeatThread heartneat = new HeartbeatThread();
heartneat.start();
try {
// for test wait forever
heartneat.join();
} catch (final Exception e) {
e.printStackTrace();
}
logger.info("Generator ready.");
}
/**
* This object handles communications with JMS. Hides the use of Session,
* Connections, Publishers etc..
*/
private JMSHandler jmsHandler;
/**
* This Map contains info about the subscribed items.
*/
private HashMap<String, SubscribedItemAttributes> subscribedItems = new HashMap<String, SubscribedItemAttributes>();
private ExternalFeeder myFeed;
private int msgPoolSize;
private int recoveryPause;
/**
* A random id that represents the life of this generator. It is sent within
* the heartbeat to let Lightstreamer distinguish between 2 different lives
* of the Generator. In production scenarios this would be probably
* substituted with something more secure (i.e. it's not impossible to get 2
* identical randoms for 2 different Generator's lives).
*/
private int random = -1;
public LightstreamerEngine(String logConf, String providerURL, String initialContextFactory, String topicConnectionFactory, String queueConnectionFactory,
String topic, String queue, int msgPoolSize, int recoveryPause, ExternalFeeder feeder) {
this.msgPoolSize = msgPoolSize;
this.recoveryPause = recoveryPause;
while (random == -1) {
random = randomGen.nextInt(5000);
}
// configure a log4j logger
configureLogger(logConf);
// instantiate a JMSHandler
jmsHandler = new JMSHandler(logger, initialContextFactory, providerURL, queueConnectionFactory, queue, topicConnectionFactory,
topic);
// This Generator will be the JMS listener
jmsHandler.setListener(this);
// instantiate and start the simulator. This is the object that
// "produce" data
myFeed = feeder;
// This Generator will be the listener
myFeed.setFeedListener(this);
}
// ///// MessageListener
private static final String messageNoComp = "Message received was not compatible with this process. Maybe someone else sending messages? ";
private static final String subUnexItem = "(Subscribing) Unexpected item: ";
private static final String unsubUnexItem = "(Unsubscribing) Unexpected item: ";
private static final String unsubUnexHandle = "(Unsubscribing) Unexpected handle for item: ";
/**
* receive messages from JMSHandler
*/
public void onMessage(Message message) {
String feedMsg = null;
logger.info("Message received: processing...");
try {
// pull out text from the Message object
final TextMessage textMessage = (TextMessage) message;
feedMsg = textMessage.getText();
logger.info("Message:TextMessage received: " + feedMsg);
} catch (final ClassCastException cce) {
// if message isn't a TextMessage then this update is not "correct"
logger.warn(messageNoComp + "(ClassCastException)");
return;
} catch (final JMSException jmse) {
logger.error("Generator.onMessage - JMSException: " + jmse.getMessage());
return;
}
String itemName = null;
String handleId = null;
if (feedMsg != null) {
logger.info("Recived message: " + feedMsg);
if (feedMsg.equals("reset")) {
reset();
}
if (feedMsg.indexOf("subscribe") == 0) {
// this is a subscribe message
itemName = feedMsg.substring(9, feedMsg.indexOf("_"));
handleId = feedMsg.substring(feedMsg.indexOf("_") + 1);
subscribe(itemName, handleId);
} else if (feedMsg.indexOf("unsubscribe") == 0) {
// this is a unsubscribe message
itemName = feedMsg.substring(11, feedMsg.indexOf("_"));
handleId = feedMsg.substring(feedMsg.indexOf("_") + 1);
unsubscribe(itemName, handleId);
}
}
if (itemName == null) {
// the message isn't a valid message
logger.warn(messageNoComp + "Message: " + feedMsg);
}
}
private void reset() {
synchronized (subscribedItems) {
subscribedItems = new HashMap<String, SubscribedItemAttributes>();
}
}
private void subscribe(String itemName, String handleId) {
logger.info("Subscribing " + itemName + " (" + handleId + ")");
logger.info("(Subscribing) Valid item: " + itemName + " (" + handleId + ")");
synchronized (subscribedItems) {
// put the item in the subscribedItems map
// if another subscription is already in that will be replaced
final SubscribedItemAttributes attr = new SubscribedItemAttributes(itemName, handleId);
subscribedItems.put(itemName, attr);
}
// now we ask the feed for the snapshot; our feed will insert
// an event with snapshot information into the normal updates flow
myFeed.forceSendCurrentValues(itemName);
logger.info("Subscribed " + itemName + " (" + handleId + ")");
}
private void unsubscribe(String itemName, String handleId) {
logger.info("Unsubscribing " + itemName + " (" + handleId + ")");
synchronized (subscribedItems) {
if (!subscribedItems.containsKey(itemName)) {
// here checks are useless, just try to get the item from the
// subscribedItems map, if not contained there is an error
logger.error(unsubUnexItem + itemName + " (" + handleId + ")");
return;
}
final SubscribedItemAttributes sia = subscribedItems.get(itemName);
if (sia.handleId == handleId) {
// remove the item from the subscribedItems map.
subscribedItems.remove(itemName);
} else {
logger.warn(unsubUnexHandle + itemName + " (" + handleId + ")");
}
}
logger.info("Unsubscribed " + itemName + " (" + handleId + ")");
}
public void onException(JMSException arg0) {
// we have lost the connection to JMS
synchronized (subscribedItems) {
// empty the subscribedItems map; this way, once reconnected
// we are able to re-send snapshots
subscribedItems = new HashMap<String, SubscribedItemAttributes>();
}
// and loop to try to reconnect
new ConnectionLoopTPQR(jmsHandler, recoveryPause, logger).start();
}
// ///////////// ExternalFeedListener
/**
* Receive update from the feeder.
*/
public void onEvent(String itemName, HashMap currentValues, boolean isSnapshot) {
SubscribedItemAttributes sia = null;
synchronized (subscribedItems) {
if (!subscribedItems.containsKey(itemName)) {
// simulator always produce all updates. Here we filter
// non-subscribed items
return;
}
// handle the snapshot status
sia = subscribedItems.get(itemName);
if (!sia.isSnapshotSent) {
if (!isSnapshot) {
// we ignore the update and keep waiting until
// a full snapshot for the item has been received
return;
}
// this is the snapshot update
sia.isSnapshotSent = true;
} else {
if (isSnapshot) {
// it's not the first event we have received carrying
// snapshot information for the item; so, this event
// is not a snapshot from Lightstreamer point of view
isSnapshot = false;
}
}
}
// prepare the object to send through JMS
final FeedMessage toSend = new FeedMessage(itemName, currentValues, isSnapshot, sia.handleId, this.random);
try {
// publish the update to JMS
jmsHandler.publishMessage(toSend);
} catch (final JMSException je) {
logger.error("Unable to send message - JMSException:" + je.getMessage());
}
}
// /////////// Utils
private static String noConf = "Please specify a valid configuration file as parameter.\nProcess exits.\n";
private static String noParam = " is missing.\nProcess exits";
private static String useDefault = " is missing. Using default.";
private static String noLog = "Log configuration fails. Check you configuration files\nProcess exits";
private static String isNaN = " must be a number but it isn't. Using default.";
private static Logger logger;
public static int getParam(Properties params, String toGet, boolean required, int def) {
int resInt;
final String res = params.getProperty(toGet);
if (res == null) {
if (required) {
throw new IllegalStateException(toGet + noParam);
} else {
if (logger != null) {
logger.warn(toGet + useDefault);
}
resInt = def;
}
} else {
try {
resInt = Integer.parseInt(res);
} catch (final NumberFormatException nfe) {
if (logger != null) {
logger.error(toGet + isNaN);
}
resInt = def;
}
}
if (logger != null) {
logger.info(toGet + ": " + resInt);
}
return resInt;
}
public static String getParam(Properties params, String toGet, boolean required, String def) {
String res = params.getProperty(toGet);
if (res == null) {
if (required) {
throw new IllegalStateException(toGet + noParam);
} else {
if (logger != null) {
logger.warn(toGet + useDefault);
}
res = def;
}
}
if (logger != null) {
logger.info(toGet + ": " + res);
}
return res;
}
public static void configureLogger(String logConf) {
if (logConf != null) {
try {
DOMConfigurator.configureAndWatch(logConf, 10000);
logger = Logger.getLogger("SLGenerator");
} catch (final Exception ex) {
ex.printStackTrace();
System.out.println(ex.getMessage());
throw new IllegalStateException(noLog);
}
} else {
System.out.println(noLog);
throw new IllegalStateException(noLog);
}
}
// //////////////////// RecoveryThread
private class ConnectionLoopTPQR extends ConnectionLoop {
public ConnectionLoopTPQR(JMSHandler jmsHandler, int recoveryPause, Logger logger) {
super(jmsHandler, recoveryPause, logger);
}
@Override
protected void onConnectionCall() {
// nothing to do
return;
}
@Override
protected void connectionCall() throws JMSException, NamingException {
// initialize TopicPublisher and QueueReceiver
jmsHandler.initTopicPublisher(msgPoolSize);
jmsHandler.initQueueReceiver();
}
}
// //////////////////// HeartbeatThread
private class HeartbeatThread extends Thread {
private HeartbeatMessage fixedMessage = new HeartbeatMessage(random);
@Override
public void run() {
while (true) {
try {
// publish the update to JMS
jmsHandler.publishMessage(fixedMessage);
} catch (final JMSException je) {
logger.error("Unable to send message - JMSException:" + je.getMessage());
}
logger.info("Heartbeat sent: " + fixedMessage.random);
try {
Thread.sleep(1000);
} catch (final InterruptedException e) {
}
}
}
}
}
And here is how i start it in my jboss managed bean:
Code:
private void initLightStreamer() throws IOException {
final InputStream manualConfig = BLogicService.class.getResourceAsStream(MANUAL_BET_LIGHT_STREAMER_JMS_CONFIG_FILE);
final Properties params = new Properties();
params.load(manualConfig);
final LightstreamerEngine manualBetsEngine = new LightstreamerEngine(LightstreamerEngine.getParam(params, "logConf", true, null), LightstreamerEngine.getParam(params, "jmsUrl", true, null),
LightstreamerEngine.getParam(params, "initialContextFactory", true, null), LightstreamerEngine.getParam(params,
"topicConnectionFactory", true, null), LightstreamerEngine.getParam(params, "queueConnectionFactory", true, null),
LightstreamerEngine.getParam(params, "topicName", true, null), LightstreamerEngine
.getParam(params, "queueName", true, null), LightstreamerEngine.getParam(params, "msgPoolSize", false, 15),
LightstreamerEngine.getParam(params, "recoveryPauseMillis", false, 2000), ManualBetFeeder.getInstance());
manualBetsEngine.start();
final InputStream ticksConfig = BLogicService.class.getResourceAsStream(TICKS_LIGHT_STREAMER_JMS_CONFIG_FILE);
final Properties params1 = new Properties();
params1.load(ticksConfig);
final LightstreamerEngine tickEngine = new LightstreamerEngine(LightstreamerEngine.getParam(params1, "logConf", true, null), LightstreamerEngine.getParam(params1, "jmsUrl", true, null),
LightstreamerEngine.getParam(params1, "initialContextFactory", true, null), LightstreamerEngine.getParam(params1,
"topicConnectionFactory", true, null), LightstreamerEngine.getParam(params1, "queueConnectionFactory", true, null),
LightstreamerEngine.getParam(params1, "topicName", true, null), LightstreamerEngine
.getParam(params1, "queueName", true, null), LightstreamerEngine.getParam(params1, "msgPoolSize", false, 15),
LightstreamerEngine.getParam(params1, "recoveryPauseMillis", false, 2000), TickFeeder.getInstance());
tickEngine.start();
}
I know that this does not exactly connected to LS but pls help.
P.S.:Excuse me for posting this in new thread.
Bookmarks