Results 1 to 6 of 6
  1. #1
    Senior Member
    Join Date
    Jul 2009
    Location
    sofia
    Posts
    40

    How LS server uses the deployed adapter(s)?

    Hi, i am still having this annoying bug, with the adapters. I have base jms adapter(minor changes from the demo):
    Code:
    package com.trinitas.ls.adapter.data;
    
    import java.io.File;
    import java.util.Enumeration;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ConcurrentLinkedQueue;
    import java.util.concurrent.locks.ReentrantReadWriteLock;
    
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.ObjectMessage;
    import javax.naming.NamingException;
    
    import org.apache.log4j.Logger;
    
    import com.lightstreamer.interfaces.data.DataProviderException;
    import com.lightstreamer.interfaces.data.FailureException;
    import com.lightstreamer.interfaces.data.ItemEventListener;
    import com.lightstreamer.interfaces.data.SubscriptionException;
    import com.trinitas.common.jms.ConnectionLoop;
    import com.trinitas.common.jms.ExtendedMessageListener;
    import com.trinitas.common.jms.FeedMessage;
    import com.trinitas.common.jms.HeartbeatMessage;
    import com.trinitas.common.jms.JMSHandler;
    import com.trinitas.common.jms.SubscribedItemAttributes;
    
    public abstract class BaseJMSDataAdapter implements  ExtendedMessageListener {
    
    	protected String loggerName;
    	protected Logger logger;
    
    	protected JMSHandler jmsHandler;
    
    	protected ItemEventListener listener;
    
    	protected ConcurrentHashMap<String, SubscribedItemAttributes> subscribedItems = new ConcurrentHashMap<String, SubscribedItemAttributes>();
    
    	protected volatile int nextHandleId = 1;
    	protected ConcurrentHashMap<String, Object> handles = new ConcurrentHashMap<String, Object>();
    
    	protected int msgPoolSize;
    	protected int recoveryPause;
    
    	protected ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(false);
    	protected ConcurrentLinkedQueue<String> toSendRequests = new ConcurrentLinkedQueue<String>();
    
    	/**
    	 * Status variables. This adapter has 3 possible states: 1) jmsOk=false and
    	 * lastHeartbeatRandom=-1: no connection to JMS is available; 2) jmsOk=true
    	 * and lastHeartbeatRandom=-1: the adapter is connected to JMS but the
    	 * Generator is not running; 3) jmsOk=true and lastHeartbeatRandom!=-1:
    	 * connection to JMS is established and the adapter is receiving heartbeats
    	 * (and/or data) by the Generator.
    	 */
    	protected volatile boolean jmsOk = false;
    	protected int lastHeartbeatRandom = -1;
    	protected int heartbeatCount = 0;
    //	protected  HashMap<String, String> inactiveMap = new HashMap<String, String>();
    //	protected HashMap<String, String> completeInactiveMap = new HashMap<String, String>();
    //	
    
    	// //////////////// DataProvider
    	public void init(Map params, File arg1) throws DataProviderException {
    		loggerName = getParam(params, "loggerName", false, "LightstreamerLogger.BaseJMSAdapter");
    		logger = Logger.getLogger(loggerName);
    		final String providerURL = getParam(params, "jmsUrl", true, null);
    		final String initialContextFactory = getParam(params, "initialContextFactory", true, null);
    		final String topicConnectionFactory = getParam(params, "topicConnectionFactory", true, null);
    		final String queueConnectionFactory = getParam(params, "queueConnectionFactory", true, null);
    		final String topic = getParam(params, "topicName", true, null);
    		final String queue = getParam(params, "queueName", true, null);
    		this.msgPoolSize = getParam(params, "msgPoolSize", false, 15);
    		this.recoveryPause = getParam(params, "recoveryPauseMillis", false, 2000);
    		logger.info("Configuration read.");
    		jmsHandler = new JMSHandler(logger, initialContextFactory, providerURL, queueConnectionFactory, queue, topicConnectionFactory,
    				topic);
    		jmsHandler.setListener(this);
    		new ConnectionLoopTSQS(jmsHandler, recoveryPause, logger).start();
    		logger.info("BaseJMSDataAdapter ready.");
    	}
    
    	public void setListener(ItemEventListener listener) {
    		this.listener = listener;
    	}
    
    	protected abstract  boolean isValidItem(String itemName);
    
    	
    	public void subscribe(String itemName, Object itemHandle, boolean needsIterator) throws SubscriptionException, FailureException {
    		logger.info("Subscribing to " + itemName);
    		if (!isValidItem(itemName)) {
    			throw new SubscriptionException("(Subscribing) Unexpected item: " + itemName);
    		}
    		logger.info("(Subscribing) Valid item: " + itemName);
    		final String uniqueId = String.valueOf(nextHandleId++);
    		final SubscribedItemAttributes itemAttrs = new SubscribedItemAttributes(itemName, uniqueId);
    		rwLock.writeLock().lock();
    		logger.info("------------------>Write LOCK 1");
    		subscribedItems.put(itemName, itemAttrs);
    		handles.put(uniqueId, itemHandle);
    		boolean dispatchThread = false;
    		if (lastHeartbeatRandom == -1) {
    			dispatchInactiveFlag(itemAttrs);
    		} else {
    			toSendRequests.offer("subscribe" + itemName + "_" + uniqueId);
    			dispatchThread = true;
    		}
    		logger.info("------------------>Write UNLOCK 1");
    		rwLock.writeLock().unlock();
    		logger.info("(Subscribing) Inserted in subscribed items list: " + itemName + " (" + uniqueId + ")");
    		if (dispatchThread) {
    			new SenderThread().start();
    		}
    	}
    
    	public void subscribe(String itemName, boolean needsIterator) throws SubscriptionException, FailureException {
    		// NEVER CALLED
    	}
    
    	public void unsubscribe(String itemName) throws SubscriptionException, FailureException {
    		logger.info("Unsubscribing from " + itemName);
    		rwLock.writeLock().lock();
    		logger.info("------------------>Write LOCK 2");
    		if (!subscribedItems.containsKey(itemName)) {
    			logger.info("------------------>Write UNLOCK 2");
    			rwLock.writeLock().unlock();
    			throw new SubscriptionException("(Unsubscribing) Unexpected item: " + itemName);
    		}
    		final SubscribedItemAttributes item = subscribedItems.get(itemName);
    		subscribedItems.remove(itemName);
    		handles.remove(item.handleId);
    		boolean dispatchThread = false;
    		if (lastHeartbeatRandom != -1) {
    			toSendRequests.offer("unsubscribe" + itemName + "_" + item.handleId);
    			dispatchThread = true;
    		}
    		logger.info("------------------>Write UNLOCK 2");
    		rwLock.writeLock().unlock();
    		logger.info("(Unsubscribing) removed from subscribed items list:" + itemName + " (" + item.handleId + ")");
    		if (dispatchThread) {
    			new SenderThread().start();
    		}
    	}
    
    	public abstract boolean isSnapshotAvailable(String itemName) throws SubscriptionException;
    
    	
    	
    	
    	protected void dispatchInactiveFlag(SubscribedItemAttributes item) {
    		final boolean isSnapshot = !item.isSnapshotSent;
    		if (isSnapshot) {
    			item.isSnapshotSent = true;
    		}
    
    		final Object handle = handles.get(item.handleId);
    		if (handle != null) {
    			// we do not need this, since we do not need default values to be set when the feeder is down
    //			if (isSnapshot) {
    //				listener.smartUpdate(handle, completeInactiveMap, isSnapshot);
    //			} else {
    //				listener.smartUpdate(handle, inactiveMap, isSnapshot);
    //			}
    		}
    		logger.info("Inactive flag dispatched: " + item.itemName + " (" + item.handleId + ")");
    
    	}
    
    	// /////////MessageListener
    
    	protected static final String noCompMex = "Message received was not compatible with this adapter.";
    
    	public void onConnection() {
    		rwLock.writeLock().lock();
    		logger.info("------------------>Write LOCK 3");
    		logger.info("JMS is now up");
    		jmsOk = true;
    		logger.info("------------------>Write UNLOCK 3");
    		rwLock.writeLock().unlock();
    	}
    
    	public void subscribeAll() {
    		logger.info("Subscribing all to the Generator");
    		toSendRequests.clear();
    		toSendRequests.offer("reset");
    		final Enumeration<SubscribedItemAttributes> subItems = subscribedItems.elements();
    		while (subItems.hasMoreElements()) {
    			final SubscribedItemAttributes sia = subItems.nextElement();
    			toSendRequests.offer("subscribe " + sia.itemName + "_" + sia.handleId);
    		}
    		new SenderThread().start();
    	}
    
    	public void onException(JMSException je) {
    		logger.error("onException: JMSException -> " + je.getMessage());
    		rwLock.writeLock().lock();
    		logger.info("------------------>Write LOCK 4");
    		logger.info("JMS is now down");
    		this.onFeedDisconnection();
    		jmsOk = false;
    		logger.info("------------------>Write UNLOCK 4");
    		rwLock.writeLock().unlock();
    		new ConnectionLoopTSQS(jmsHandler, recoveryPause, logger).start();
    	}
    
    	public void onFeedDisconnection() {
    		logger.info("Feed no more available");
    		lastHeartbeatRandom = -1;
    		final Enumeration<SubscribedItemAttributes> subItems = subscribedItems.elements();
    		while (subItems.hasMoreElements()) {
    			final SubscribedItemAttributes sia = subItems.nextElement();
    			this.dispatchInactiveFlag(sia);
    		}
    	}
    
    	public void onMessage(Message message) {
    		if (message == null) {
    			logger.warn(noCompMex + " (null)");
    			return;
    		}
    		logger.info("Received message");
    		FeedMessage feedMsg = null;
    		SubscribedItemAttributes item = null;
    		try {
    			final ObjectMessage objectMessage = (ObjectMessage) message;
    			try {
    				final HeartbeatMessage beat = (HeartbeatMessage) objectMessage.getObject();
    				handleHeartbeat(beat.random);
    				return;
    			} catch (final ClassCastException jmse) {
    				feedMsg = (FeedMessage) objectMessage.getObject();
    				if (!handleHeartbeat(feedMsg.random)) {
    					return;
    				}
    			}
    			logger.info("Valid message");
    		} catch (final ClassCastException jmse) {
    			logger.warn(noCompMex + "(not a FeedMessage instance)");
    			return;
    		} catch (final JMSException jmse) {
    			logger.error("BaseJMSDataAdapter.onMessage - JMSException: " + jmse.getMessage(), jmse);
    			return;
    		}
    		rwLock.readLock().lock();
    		logger.info("------------------>Read LOCK 5");
    		if (!subscribedItems.containsKey(feedMsg.itemName)) {
    			logger.info("------------------>Read UNLOCK 5");
    			rwLock.readLock().unlock();
    			logger.info("Received update for not subscribed item: " + feedMsg.itemName);
    			return;
    		}
    
    		Object handle = null;
    		boolean isSnapshot = false;
    		item = subscribedItems.get(feedMsg.itemName);
    		if (item != null) {
    			handle = handles.get(feedMsg.handleId);
    			if (handle == null) {
    				logger.info("------------------>Read UNLOCK 5");
    				rwLock.readLock().unlock();
    				logger.info("Received update for unsubscribed handle: " + feedMsg.itemName + "(" + feedMsg.handleId + ")");
    				return;
    			}
    			if (!item.isSnapshotSent) {
    				item.isSnapshotSent = true;
    				isSnapshot = true;
    			}
    		}
    		logger.info("Received update for item " + feedMsg.itemName);
    		listener.smartUpdate(handle, feedMsg.currentValues, isSnapshot);
    		logger.info("------------------>Read UNLOCK 5");
    		rwLock.readLock().unlock();
    	}
    
    	protected boolean handleHeartbeat(int beat) {
    		rwLock.writeLock().lock();
    		logger.info("------------------>Write LOCK 6");
    		logger.info("lastHeartbeatRandom is " + lastHeartbeatRandom);
    		if (lastHeartbeatRandom == beat) {
    			heartbeatCount = (heartbeatCount + 1) % 1000;
    			logger.info("Received heartbeat: " + beat);
    			logger.info("------------------>Write UNLOCK 6");
    			rwLock.writeLock().unlock();
    			return true;
    		} else {
    			logger.info("Received NEW heartbeat: " + beat + ", feed is now available");
    			lastHeartbeatRandom = beat;
    			this.subscribeAll();
    			heartbeatCount = 0;
    			logger.info("------------------>Write UNLOCK 6");
    			rwLock.writeLock().unlock();
    			new HeartbeatThread(beat).start();
    			return false;
    		}
    	}
    
    	protected class HeartbeatThread extends Thread {
    
    		protected int random;
    		protected int count;
    
    		public HeartbeatThread(int random) {
    			this.random = random;
    		}
    
    		@Override
    		public void run() {
    			while (this.random == lastHeartbeatRandom) {
    				try {
    					Thread.sleep(2000);
    				} catch (final InterruptedException e) {
    				}
    				rwLock.writeLock().lock();
    				logger.info("------------------>Write LOCK 7");
    				if (this.random == lastHeartbeatRandom && count == heartbeatCount) {
    					logger.info("2 Seconds without Heartbeats: " + this.random);
    					onFeedDisconnection();
    					logger.info("------------------>Write UNLOCK 7");
    					rwLock.writeLock().unlock();
    					return;
    				} else {
    					count = heartbeatCount;
    				}
    				logger.info("------------------>Write UNLOCK 7");
    				rwLock.writeLock().unlock();
    			}
    		}
    
    	}
    
    	// /////////////// Utils
    	protected static String noParam = " is missing.\nProcess exits";
    	protected static String useDefault = " is missing. Using default.";
    	protected static String isNaN = " must be a number but it isn't. Using default.";
    
    	protected int getParam(Map params, String toGet, boolean required, int def) throws DataProviderException {
    		int resInt;
    		final String res = (String) params.get(toGet);
    		if (res == null) {
    			if (required) {
    				throw new DataProviderException(toGet + noParam);
    			} else {
    				if (logger != null) {
    					logger.warn(toGet + useDefault);
    				}
    				resInt = def;
    			}
    		} else {
    			try {
    				resInt = Integer.parseInt(res);
    			} catch (final NumberFormatException nfe) {
    				if (logger != null) {
    					logger.error(toGet + isNaN);
    				}
    				resInt = def;
    			}
    		}
    
    		if (logger != null) {
    			logger.info(toGet + ": " + resInt);
    		}
    		return resInt;
    	}
    
    	protected String getParam(Map params, String toGet, boolean required, String def) throws DataProviderException {
    		String res = (String) params.get(toGet);
    		if (res == null) {
    			if (required) {
    				throw new DataProviderException(toGet + noParam);
    			} else {
    				if (logger != null) {
    					logger.warn(toGet + useDefault);
    				}
    				res = def;
    			}
    		}
    		if (logger != null) {
    			logger.info(toGet + ": " + res);
    		}
    		return res;
    	}
    
    	// ////////////////// ConnectionLoop
    
    	protected class ConnectionLoopTSQS extends ConnectionLoop {
    
    		public ConnectionLoopTSQS(JMSHandler jmsHandler, int recoveryPause, Logger logger) {
    			super(jmsHandler, recoveryPause, logger);
    		}
    
    		@Override
    		protected void onConnectionCall() {
    			onConnection();
    		}
    
    		@Override
    		protected void connectionCall() throws JMSException, NamingException {
    			jmsHandler.initTopicSubscriber();
    			jmsHandler.initQueueSender(msgPoolSize);
    		}
    
    	}
    
    	public class SenderThread extends Thread {
    
    		@Override
    		public void run() {
    			String nextRequest = "";
    
    			logger.info("Dispatch thread started");
    			rwLock.readLock().lock();
    			logger.info("------------------>Read LOCK 8");
    			while ((nextRequest = toSendRequests.poll()) != null) {
    				try {
    					jmsHandler.sendMessage(nextRequest);
    					logger.info("Message dispatched to JMS: " + nextRequest);
    				} catch (final JMSException je) {
    					logger.error("Can't actually dispatch request " + nextRequest + ": JMSException -> " + je.getMessage());
    				}
    			}
    			logger.info("------------------>Read UNLOCK 8");
    			rwLock.readLock().unlock();
    			logger.info("Dispatch thread ends");
    		}
    
    	}
    }
    And i have other classes that extends this adapter:
    Code:
    package com.trinitas.ls.adapter.data;
    
    import com.lightstreamer.interfaces.data.FailureException;
    import com.lightstreamer.interfaces.data.SmartDataProvider;
    import com.lightstreamer.interfaces.data.SubscriptionException;
    
    public class ManualBetJMSDataAdapter extends BaseJMSDataAdapter  implements SmartDataProvider{
    
    //	static {
    //        inactiveMap.put("item_status","inactive");
    //        completeInactiveMap.put("item_status","inactive");	
    //	}
    	
    	@Override
    	public boolean isSnapshotAvailable(String itemName) throws SubscriptionException {
    		return false;
    	}
    
    	@Override
    	protected boolean isValidItem(String itemName) {
    		return true;
    	}
    
    	@Override
    	public void subscribe(String itemName, boolean needsIterator) throws SubscriptionException, FailureException {
    		super.subscribe(itemName, needsIterator);
    	}
    }
    and
    Code:
    package com.trinitas.ls.adapter.data;
    
    import com.lightstreamer.interfaces.data.SmartDataProvider;
    import com.lightstreamer.interfaces.data.SubscriptionException;
    
    public class TickJMSDataAdapter extends BaseJMSDataAdapter implements SmartDataProvider {
    
    //	static {
    //        inactiveMap.put("item_status","inactive");
    //        completeInactiveMap.put("item_status","inactive");	
    //	}
    	
    	@Override
    	public boolean isSnapshotAvailable(String itemName) throws SubscriptionException {
    		return false;
    	}
    
    	@Override
    	protected boolean isValidItem(String itemName) {
    		return true;
    	}
    
    }
    In my adapters.xml :
    Code:
    <?xml version="1.0"?>
    
    <adapters_conf id="CLAIRE">
    
        <metadata_provider>
    
            <adapter_class>com.trinitas.ls.adapter.meta.LiteralBasedProvider</adapter_class>
    
        </metadata_provider>
    
        <data_provider name="MANUALBETSJMSADAPTER">
            <adapter_class>com.trinitas.ls.adapter.data.ManualBetJMSDataAdapter</adapter_class>
    <!--        Parameters required by MANUALBETSJMSADAPTER  -->
    
            <param name="loggerName">LightstreamerLogger.ManualBetJMSDataAdapter</param>
            <param name="msgPoolSize">15</param>
            <param name="recoveryPauseMillis">2000</param>
    
    <!--        JBoss Messaging configuration -->
    
            <param name="jmsUrl">jnp://localhost:1099</param>
            <param name="initialContextFactory">org.jnp.interfaces.NamingContextFactory</param>
            <param name="topicConnectionFactory">ConnectionFactory</param>
            <param name="queueConnectionFactory">ConnectionFactory</param>
            <param name="topicName">topic/manualbetTopic</param>
            <param name="queueName">queue/manualbetconfQueue</param>
    
        </data_provider>
    
        <data_provider name="TICKSJMSADAPTER">
    
            <adapter_class>com.trinitas.ls.adapter.data.TickJMSDataAdapter</adapter_class>
    <!--        Parameters required by TICKSJMSADAPTER  -->
    
            <param name="loggerName">LightstreamerLogger.TickJMSDataAdapter</param>
            <param name="msgPoolSize">15</param>
            <param name="recoveryPauseMillis">2000</param>
    
    
            <param name="jmsUrl">jnp://localhost:1099</param>
            <param name="initialContextFactory">org.jnp.interfaces.NamingContextFactory</param>
            <param name="topicConnectionFactory">ConnectionFactory</param>
            <param name="queueConnectionFactory">ConnectionFactory</param>
            <param name="topicName">topic/tickTopic</param>
            <param name="queueName">queue/tickconfQueue</param>
    
        </data_provider>
    
    
    
    </adapters_conf>
    When i run the LS and my feed (which is in jboss, jms queues/topics also in jboss). Only the ticks adapter is working, but the manual bets adapter receives client request for subscription but does not send the subscription request through the jms to the manualBetsFeeder :
    Code:
    15.Sep.09 15:04:01,932 < WARN> Adapters configuration not found in /home/mnenchev/tools/Lightstreamer/conf/../adapters/StockQuotesJMSAdapter
    15.Sep.09 15:04:01,942 < WARN> Adapters configuration not found in /home/mnenchev/tools/Lightstreamer/conf/../adapters/HelloWorld
    15.Sep.09 15:04:01,983 < INFO> Loading Metadata Adapter CLAIRE
    15.Sep.09 15:04:01,984 < INFO> Loading Data Adapter CLAIRE.TICKSJMSADAPTER
    15.Sep.09 15:04:01,986 < INFO> Loading Data Adapter CLAIRE.MANUALBETSJMSADAPTER
    15.Sep.09 15:04:01,995 < INFO> jmsUrl: jnp://localhost:1099
    15.Sep.09 15:04:01,995 < INFO> initialContextFactory: org.jnp.interfaces.NamingContextFactory
    15.Sep.09 15:04:01,995 < INFO> topicConnectionFactory: ConnectionFactory
    15.Sep.09 15:04:01,995 < INFO> queueConnectionFactory: ConnectionFactory
    15.Sep.09 15:04:01,995 < INFO> Finished loading Metadata Adapter CLAIRE
    15.Sep.09 15:04:01,995 < INFO> topicName: topic/tickTopic
    15.Sep.09 15:04:01,995 < INFO> jmsUrl: jnp://localhost:1099
    15.Sep.09 15:04:01,995 < INFO> queueName: queue/tickconfQueue
    15.Sep.09 15:04:01,995 < INFO> initialContextFactory: org.jnp.interfaces.NamingContextFactory
    15.Sep.09 15:04:01,996 < INFO> topicConnectionFactory: ConnectionFactory
    15.Sep.09 15:04:01,996 < INFO> queueConnectionFactory: ConnectionFactory
    15.Sep.09 15:04:01,996 < INFO> topicName: topic/manualbetTopic
    15.Sep.09 15:04:01,996 < INFO> queueName: queue/manualbetconfQueue
    15.Sep.09 15:04:01,996 < INFO> msgPoolSize: 15
    15.Sep.09 15:04:01,996 < INFO> msgPoolSize: 15
    15.Sep.09 15:04:01,996 < INFO> recoveryPauseMillis: 2000
    15.Sep.09 15:04:01,996 < INFO> recoveryPauseMillis: 2000
    15.Sep.09 15:04:01,997 < INFO> Configuration read.
    15.Sep.09 15:04:01,996 < INFO> Configuration read.
    15.Sep.09 15:04:01,999 < INFO> JMSHandler Ready
    15.Sep.09 15:04:01,999 < INFO> JMSHandler Ready
    15.Sep.09 15:04:02,000 < INFO> BaseJMSDataAdapter ready.
    15.Sep.09 15:04:02,001 < INFO> Finished loading Data Adapter CLAIRE.MANUALBETSJMSADAPTER
    15.Sep.09 15:04:02,001 < INFO> BaseJMSDataAdapter ready.
    15.Sep.09 15:04:02,001 < INFO> Finished loading Data Adapter CLAIRE.TICKSJMSADAPTER
    15.Sep.09 15:04:02,023 < INFO> JNDI Context[{jnp.parsedName=, java.naming.provider.url=localhost:1099, java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory}]...
    15.Sep.09 15:04:02,023 < INFO> Looking up topic connection factory [ConnectionFactory]...
    15.Sep.09 15:04:02,084 < INFO> Pump pool size set by default at 4
    15.Sep.09 15:04:02,210 < INFO> Events pool size set by default at 4
    15.Sep.09 15:04:02,220 < INFO> Lightstreamer Server 3.5 build 1428.4 starting...
    15.Sep.09 15:04:02,298 < INFO> Server "Lightstreamer HTTP Server" listening to *:8888 ...
    15.Sep.09 15:04:02,449 < INFO> Looking up topic [topic/tickTopic]...
    15.Sep.09 15:04:03,076 < INFO> Topic connection created
    15.Sep.09 15:04:03,345 < INFO> Topic session created
    15.Sep.09 15:04:03,449 < INFO> Topic subscriber created
    15.Sep.09 15:04:03,465 < INFO> Topic connection started
    15.Sep.09 15:04:03,465 < INFO> Looking up queue connection factory [ConnectionFactory]...
    15.Sep.09 15:04:03,468 < INFO> Looking up queue [queue/tickconfQueue]...
    15.Sep.09 15:04:03,498 < INFO> Queue connection created
    15.Sep.09 15:04:03,499 < INFO> Queue session created
    15.Sep.09 15:04:03,553 < INFO> Queue sender created
    15.Sep.09 15:04:03,557 < INFO> Text message pool created
    15.Sep.09 15:04:03,557 < INFO> ------------------>Write LOCK 3
    15.Sep.09 15:04:03,557 < INFO> JMS is now up
    15.Sep.09 15:04:03,557 < INFO> ------------------>Write UNLOCK 3
    15.Sep.09 15:04:03,768 < INFO> Received message
    15.Sep.09 15:04:03,770 < INFO> ------------------>Write LOCK 6
    15.Sep.09 15:04:03,770 < INFO> lastHeartbeatRandom is -1
    15.Sep.09 15:04:03,770 < INFO> Received NEW heartbeat: 3555, feed is now available
    15.Sep.09 15:04:03,770 < INFO> Subscribing all to the Generator
    15.Sep.09 15:04:03,771 < INFO> ------------------>Write UNLOCK 6
    15.Sep.09 15:04:03,771 < INFO> Dispatch thread started
    15.Sep.09 15:04:03,772 < INFO> ------------------>Read LOCK 8
    15.Sep.09 15:04:03,772 < INFO> Sending message: reset
    15.Sep.09 15:04:03,831 < INFO> Message dispatched to JMS: reset
    15.Sep.09 15:04:03,831 < INFO> ------------------>Read UNLOCK 8
    15.Sep.09 15:04:03,832 < INFO> Dispatch thread ends
    15.Sep.09 15:04:04,012 < INFO> Serving request: /lightstreamer/create_session.js?LS_phase=3613&LS_domain=sofiamn&LS_polling=true&LS_polling_millis=0&LS_idle_millis=30000&LS_client_version=4.3&LS_adapter=CLAIRE&LS_old_session=Sdcc0dddd28da58beT5450466& from 192.168.2.105:36571
    15.Sep.09 15:04:04,049 < INFO> Starting new session: Saa47f07970cf4707T0404039 from 192.168.2.105:36571
    15.Sep.09 15:04:04,118 < INFO> Serving request: /lightstreamer/control.html?LS_session=Saa47f07970cf4707T0404039&LS_window=3&LS_win_phase=8&LS_op=add&LS_req_phase=75&LS_mode1=DISTINCT&LS_id1=manualbet&LS_schema1=message&LS_data_adapter1=MANUALBETSJMSADAPTER&LS_snapshot1=true&LS_unique=3 from 192.168.2.105:36571
    15.Sep.09 15:04:04,127 < INFO> Controlling session: Saa47f07970cf4707T0404039 from 192.168.2.105:36571
    15.Sep.09 15:04:04,147 < INFO> Serving request: /lightstreamer/STREAMING_IN_PROGRESS?LS_session=Saa47f07970cf4707T0404039&LS_phase=3614&LS_domain=sofiamn& from 192.168.2.105:36572
    15.Sep.09 15:04:04,149 < INFO> Attaching session: Saa47f07970cf4707T0404039 from 192.168.2.105:36572
    15.Sep.09 15:04:04,169 < INFO> Subscribing to manualbet
    15.Sep.09 15:04:04,169 < INFO> (Subscribing) Valid item: manualbet
    15.Sep.09 15:04:04,170 < INFO> ------------------>Write LOCK 1
    15.Sep.09 15:04:04,170 < INFO> Inactive flag dispatched: manualbet (1)
    15.Sep.09 15:04:04,170 < INFO> ------------------>Write UNLOCK 1
    15.Sep.09 15:04:04,170 < INFO> (Subscribing) Inserted in subscribed items list: manualbet (1)
    15.Sep.09 15:04:04,759 < INFO> Received message
    15.Sep.09 15:04:04,760 < INFO> ------------------>Write LOCK 6
    .....
    What disturbs me is that in my jboss i start 2 threads(each one like the demo Generator.java - maintain the jms heartbeat and so on) one for every adapter. But in the ls log i receive the heartbeat from one "generator" i.e.:
    In jboss i have two generators running with heartbeats:
    ...
    15:03:46,737 INFO [SLGenerator] Heartbeat sent: 2940
    15:03:47,736 INFO [SLGenerator] Heartbeat sent: 3555
    15:03:47,740 INFO [SLGenerator] Heartbeat sent: 2940
    .....

  2. #2
    Senior Member
    Join Date
    Jul 2009
    Location
    sofia
    Posts
    40
    and in LS server i receive:
    .....
    15.Sep.09 15:04:07,761 < INFO> lastHeartbeatRandom is 3555
    15.Sep.09 15:04:07,761 < INFO> Received heartbeat: 3555
    15.Sep.09 15:04:07,761 < INFO> ------------------>Write UNLOCK 6
    15.Sep.09 15:04:07,772 < INFO> ------------------>Write LOCK 7
    15.Sep.09 15:04:07,773 < INFO> ------------------>Write UNLOCK 7
    15.Sep.09 15:04:08,761 < INFO> Received message
    15.Sep.09 15:04:08,762 < INFO> ------------------>Write LOCK 6
    15.Sep.09 15:04:08,762 < INFO> lastHeartbeatRandom is 3555
    ......

    But since i have two adapters (two different instances of the classes that extends my baseJMSadapter) i do not see why this is happeinng. The strange thing is that
    15.Sep.09 15:04:04,170 < INFO> Inactive flag dispatched: manualbet (1) - lastHeartBeatRandom = -1 ?!?! And for that reason no jms message is send to the feeder, and that is happening only for ManualBetJMSAdapter, not for the other. Also if i remove the TickJMSAdapter from the adapters.xml ManualBetJMSAdapter is working properly.

    Here is my LightStreamerEngine (like you call it Generator.java)
    Code:
    package com.trinitas.ls.feed;
    
    import java.util.HashMap;
    import java.util.Properties;
    import java.util.Random;
    
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.TextMessage;
    import javax.naming.NamingException;
    
    import org.apache.log4j.Logger;
    import org.apache.log4j.xml.DOMConfigurator;
    
    import com.trinitas.common.jms.ConnectionLoop;
    import com.trinitas.common.jms.ExtendedMessageListener;
    import com.trinitas.common.jms.FeedMessage;
    import com.trinitas.common.jms.HeartbeatMessage;
    import com.trinitas.common.jms.JMSHandler;
    import com.trinitas.common.jms.SubscribedItemAttributes;
    
    public class LightstreamerEngine extends Thread implements ExtendedMessageListener, ExternalFeedListener {
    
    	private Random randomGen = new Random();
    	@Override
    	public void run() {
    		logger.info("generator is starting. Loading configuration...");
    		myFeed.start();
    		// start the loop that tries to connect to JMS
    		new ConnectionLoopTPQR(jmsHandler, recoveryPause, logger).start();
    		final HeartbeatThread heartneat = new HeartbeatThread();
    		heartneat.start();
    		try {
    			// for test wait forever
    			heartneat.join();
    		} catch (final Exception e) {
    			e.printStackTrace();
    		}
    		logger.info("Generator ready.");
    	}
    
    	/**
    	 * This object handles communications with JMS. Hides the use of Session,
    	 * Connections, Publishers etc..
    	 */
    	private JMSHandler jmsHandler;
    
    	/**
    	 * This Map contains info about the subscribed items.
    	 */
    	private HashMap<String, SubscribedItemAttributes> subscribedItems = new HashMap<String, SubscribedItemAttributes>();
    
    	private ExternalFeeder myFeed;
    
    	private int msgPoolSize;
    	private int recoveryPause;
    
    	/**
    	 * A random id that represents the life of this generator. It is sent within
    	 * the heartbeat to let Lightstreamer distinguish between 2 different lives
    	 * of the Generator. In production scenarios this would be probably
    	 * substituted with something more secure (i.e. it's not impossible to get 2
    	 * identical randoms for 2 different Generator's lives).
    	 */
    	private int random = -1;
    	
    	public LightstreamerEngine(String logConf, String providerURL, String initialContextFactory, String topicConnectionFactory, String queueConnectionFactory,
    			String topic, String queue, int msgPoolSize, int recoveryPause, ExternalFeeder feeder) {
    		this.msgPoolSize = msgPoolSize;
    		this.recoveryPause = recoveryPause;
    		while (random == -1) {
    			random = randomGen.nextInt(5000);
    		}
    		// configure a log4j logger
    		 configureLogger(logConf);
    		// instantiate a JMSHandler
    		jmsHandler = new JMSHandler(logger, initialContextFactory, providerURL, queueConnectionFactory, queue, topicConnectionFactory,
    				topic);
    		// This Generator will be the JMS listener
    		jmsHandler.setListener(this);
    
    		// instantiate and start the simulator. This is the object that
    		// "produce" data
    		myFeed = feeder;
    		// This Generator will be the listener
    		myFeed.setFeedListener(this);
    	}
    
    	// ///// MessageListener
    
    	private static final String messageNoComp = "Message received was not compatible with this process. Maybe someone else sending messages? ";
    	private static final String subUnexItem = "(Subscribing) Unexpected item: ";
    	private static final String unsubUnexItem = "(Unsubscribing) Unexpected item: ";
    	private static final String unsubUnexHandle = "(Unsubscribing) Unexpected handle for item: ";
    
    	/**
    	 * receive messages from JMSHandler
    	 */
    	public void onMessage(Message message) {
    		String feedMsg = null;
    		logger.info("Message received: processing...");
    		try {
    			// pull out text from the Message object
    			final TextMessage textMessage = (TextMessage) message;
    			feedMsg = textMessage.getText();
    			logger.info("Message:TextMessage received: " + feedMsg);
    		} catch (final ClassCastException cce) {
    			// if message isn't a TextMessage then this update is not "correct"
    			logger.warn(messageNoComp + "(ClassCastException)");
    			return;
    		} catch (final JMSException jmse) {
    			logger.error("Generator.onMessage - JMSException: " + jmse.getMessage());
    			return;
    		}
    
    		String itemName = null;
    		String handleId = null;
    		if (feedMsg != null) {
    			logger.info("Recived message: " + feedMsg);
    			if (feedMsg.equals("reset")) {
    				reset();
    			}
    			if (feedMsg.indexOf("subscribe") == 0) {
    				// this is a subscribe message
    				itemName = feedMsg.substring(9, feedMsg.indexOf("_"));
    				handleId = feedMsg.substring(feedMsg.indexOf("_") + 1);
    				subscribe(itemName, handleId);
    			} else if (feedMsg.indexOf("unsubscribe") == 0) {
    				// this is a unsubscribe message
    				itemName = feedMsg.substring(11, feedMsg.indexOf("_"));
    				handleId = feedMsg.substring(feedMsg.indexOf("_") + 1);
    				unsubscribe(itemName, handleId);
    			}
    		}
    
    		if (itemName == null) {
    			// the message isn't a valid message
    			logger.warn(messageNoComp + "Message: " + feedMsg);
    		}
    	}
    
    	private void reset() {
    		synchronized (subscribedItems) {
    			subscribedItems = new HashMap<String, SubscribedItemAttributes>();
    		}
    	}
    
    	private void subscribe(String itemName, String handleId) {
    		logger.info("Subscribing " + itemName + "  (" + handleId + ")");
    
    		logger.info("(Subscribing) Valid item: " + itemName + "  (" + handleId + ")");
    		synchronized (subscribedItems) {
    			// put the item in the subscribedItems map
    			// if another subscription is already in that will be replaced
    			final SubscribedItemAttributes attr = new SubscribedItemAttributes(itemName, handleId);
    			subscribedItems.put(itemName, attr);
    		}
    		// now we ask the feed for the snapshot; our feed will insert
    		// an event with snapshot information into the normal updates flow
    		myFeed.forceSendCurrentValues(itemName);
    		logger.info("Subscribed " + itemName + "  (" + handleId + ")");
    
    	}
    
    	private void unsubscribe(String itemName, String handleId) {
    		logger.info("Unsubscribing " + itemName + "  (" + handleId + ")");
    		synchronized (subscribedItems) {
    			if (!subscribedItems.containsKey(itemName)) {
    				// here checks are useless, just try to get the item from the
    				// subscribedItems map, if not contained there is an error
    				logger.error(unsubUnexItem + itemName + "  (" + handleId + ")");
    				return;
    			}
    
    			final SubscribedItemAttributes sia = subscribedItems.get(itemName);
    			if (sia.handleId == handleId) {
    				// remove the item from the subscribedItems map.
    				subscribedItems.remove(itemName);
    			} else {
    				logger.warn(unsubUnexHandle + itemName + "  (" + handleId + ")");
    			}
    
    		}
    		logger.info("Unsubscribed " + itemName + "  (" + handleId + ")");
    	}
    
    	public void onException(JMSException arg0) {
    		// we have lost the connection to JMS
    		synchronized (subscribedItems) {
    			// empty the subscribedItems map; this way, once reconnected
    			// we are able to re-send snapshots
    			subscribedItems = new HashMap<String, SubscribedItemAttributes>();
    		}
    		// and loop to try to reconnect
    		new ConnectionLoopTPQR(jmsHandler, recoveryPause, logger).start();
    
    	}
    
    	// ///////////// ExternalFeedListener
    
    	/**
    	 * Receive update from the feeder.
    	 */
    	public void onEvent(String itemName, HashMap currentValues, boolean isSnapshot) {
    		SubscribedItemAttributes sia = null;
    		synchronized (subscribedItems) {
    			if (!subscribedItems.containsKey(itemName)) {
    				// simulator always produce all updates. Here we filter
    				// non-subscribed items
    				return;
    			}
    
    			// handle the snapshot status
    			sia = subscribedItems.get(itemName);
    
    			if (!sia.isSnapshotSent) {
    				if (!isSnapshot) {
    					// we ignore the update and keep waiting until
    					// a full snapshot for the item has been received
    					return;
    				}
    				// this is the snapshot update
    				sia.isSnapshotSent = true;
    			} else {
    				if (isSnapshot) {
    					// it's not the first event we have received carrying
    					// snapshot information for the item; so, this event
    					// is not a snapshot from Lightstreamer point of view
    					isSnapshot = false;
    				}
    			}
    		}
    
    		// prepare the object to send through JMS
    		final FeedMessage toSend = new FeedMessage(itemName, currentValues, isSnapshot, sia.handleId, this.random);
    		try {
    			// publish the update to JMS
    			jmsHandler.publishMessage(toSend);
    		} catch (final JMSException je) {
    			logger.error("Unable to send message - JMSException:" + je.getMessage());
    		}
    
    	}
    
    	// /////////// Utils
    
    	private static String noConf = "Please specify a valid configuration file as parameter.\nProcess exits.\n";
    	private static String noParam = " is missing.\nProcess exits";
    	private static String useDefault = " is missing. Using default.";
    	private static String noLog = "Log configuration fails. Check you configuration files\nProcess exits";
    	private static String isNaN = " must be a number but it isn't. Using default.";
    
    	private static Logger logger;
    
    	public static int getParam(Properties params, String toGet, boolean required, int def) {
    		int resInt;
    		final String res = params.getProperty(toGet);
    		if (res == null) {
    			if (required) {
    				throw new IllegalStateException(toGet + noParam);
    			} else {
    				if (logger != null) {
    					logger.warn(toGet + useDefault);
    				}
    				resInt = def;
    			}
    		} else {
    			try {
    				resInt = Integer.parseInt(res);
    			} catch (final NumberFormatException nfe) {
    				if (logger != null) {
    					logger.error(toGet + isNaN);
    				}
    				resInt = def;
    			}
    		}
    
    		if (logger != null) {
    			logger.info(toGet + ": " + resInt);
    		}
    
    		return resInt;
    	}
    
    	public static String getParam(Properties params, String toGet, boolean required, String def) {
    		String res = params.getProperty(toGet);
    		if (res == null) {
    			if (required) {
    				throw new IllegalStateException(toGet + noParam);
    			} else {
    				if (logger != null) {
    					logger.warn(toGet + useDefault);
    				}
    				res = def;
    			}
    		}
    
    		if (logger != null) {
    			logger.info(toGet + ": " + res);
    		}
    
    		return res;
    	}
    
    	public static void configureLogger(String logConf) {
    		if (logConf != null) {
    			try {
    				DOMConfigurator.configureAndWatch(logConf, 10000);
    				logger = Logger.getLogger("SLGenerator");
    			} catch (final Exception ex) {
    				ex.printStackTrace();
    				System.out.println(ex.getMessage());
    				throw new IllegalStateException(noLog);
    			}
    		} else {
    			System.out.println(noLog);
    			throw new IllegalStateException(noLog);
    		}
    	}
    
    	// //////////////////// RecoveryThread
    
    	private class ConnectionLoopTPQR extends ConnectionLoop {
    
    		public ConnectionLoopTPQR(JMSHandler jmsHandler, int recoveryPause, Logger logger) {
    			super(jmsHandler, recoveryPause, logger);
    		}
    
    		@Override
    		protected void onConnectionCall() {
    			// nothing to do
    			return;
    		}
    
    		@Override
    		protected void connectionCall() throws JMSException, NamingException {
    			// initialize TopicPublisher and QueueReceiver
    			jmsHandler.initTopicPublisher(msgPoolSize);
    			jmsHandler.initQueueReceiver();
    		}
    
    	}
    
    	// //////////////////// HeartbeatThread
    
    	private class HeartbeatThread extends Thread {
    
    		private HeartbeatMessage fixedMessage = new HeartbeatMessage(random);
    
    		@Override
    		public void run() {
    			while (true) {
    				try {
    					// publish the update to JMS
    					jmsHandler.publishMessage(fixedMessage);
    				} catch (final JMSException je) {
    					logger.error("Unable to send message - JMSException:" + je.getMessage());
    				}
    				logger.info("Heartbeat sent: " + fixedMessage.random);
    
    				try {
    					Thread.sleep(1000);
    				} catch (final InterruptedException e) {
    				}
    			}
    		}
    	}
    
    }
    And here is how i start it in my jboss managed bean:
    Code:
    private void initLightStreamer() throws IOException {
    		final InputStream manualConfig = BLogicService.class.getResourceAsStream(MANUAL_BET_LIGHT_STREAMER_JMS_CONFIG_FILE);
    		final Properties params = new Properties();
    		params.load(manualConfig);
    		final LightstreamerEngine manualBetsEngine = new LightstreamerEngine(LightstreamerEngine.getParam(params, "logConf", true, null), LightstreamerEngine.getParam(params, "jmsUrl", true, null),
    				LightstreamerEngine.getParam(params, "initialContextFactory", true, null), LightstreamerEngine.getParam(params,
    						"topicConnectionFactory", true, null), LightstreamerEngine.getParam(params, "queueConnectionFactory", true, null),
    				LightstreamerEngine.getParam(params, "topicName", true, null), LightstreamerEngine
    						.getParam(params, "queueName", true, null), LightstreamerEngine.getParam(params, "msgPoolSize", false, 15),
    				LightstreamerEngine.getParam(params, "recoveryPauseMillis", false, 2000), ManualBetFeeder.getInstance());
    		manualBetsEngine.start();
    		
    		
    		final InputStream ticksConfig = BLogicService.class.getResourceAsStream(TICKS_LIGHT_STREAMER_JMS_CONFIG_FILE);
    		final Properties params1 = new Properties();
    		params1.load(ticksConfig);
    		final LightstreamerEngine tickEngine = new LightstreamerEngine(LightstreamerEngine.getParam(params1, "logConf", true, null), LightstreamerEngine.getParam(params1, "jmsUrl", true, null),
    				LightstreamerEngine.getParam(params1, "initialContextFactory", true, null), LightstreamerEngine.getParam(params1,
    						"topicConnectionFactory", true, null), LightstreamerEngine.getParam(params1, "queueConnectionFactory", true, null),
    				LightstreamerEngine.getParam(params1, "topicName", true, null), LightstreamerEngine
    						.getParam(params1, "queueName", true, null), LightstreamerEngine.getParam(params1, "msgPoolSize", false, 15),
    				LightstreamerEngine.getParam(params1, "recoveryPauseMillis", false, 2000), TickFeeder.getInstance());
    		tickEngine.start();
    	}
    I know that this does not exactly connected to LS but pls help.
    P.S.:Excuse me for posting this in new thread.

  3. #3
    Administrator
    Join Date
    Jul 2006
    Location
    Milan
    Posts
    975
    Let's concentrate on the Data Adapter side.
    I see that the two adapter initialization phases log in parallel, until the following line:
    Finished loading Data Adapter ......
    Then, the two JMS connection phases should start, deferred to different threads, through
    new ConnectionLoopTSQS(jmsHandler, recoveryPause, logger).start();
    but only one of them produces some log.

    The ConnectionLoop code is not shown.
    Assuming that it is the same as in our demo code,
    I notice that there is a static member, called phase in it,
    that, unfortunately, both of us missed when we analyzed the code for reentrancy.
    The member is remembered across subsequent instantiations of ConnectionLoop,
    in order to manage the recovery in some way.
    Hence, its behavior may be wrong if multiple parallel instantiations of ConnectionLoop are performed.
    This is likely the cause of the problem; note that the same may apply to the generator side.

    I'm afraid the ConnectionLoop class has to be revised or reimplemented for your use case.
    Please, try; we can't go that far.

    Anyway, I suggest you adding %t to your ConversionPattern in the Server log configuration file,
    in order to get thread information in the log.

  4. #4
    Senior Member
    Join Date
    Jul 2009
    Location
    sofia
    Posts
    40

    Great! I will try it, but it seems that this could be the problem. I will try it.
    One more question. What exactly happen when a client subscribes to 'item1" that has schema ={"property1","property2","property3","property4 "} and onItemUpdate

    we have some custom logic:

    if (itemUpdate.isValueChanged("property1")) {
    var msg = itemUpdate.getNewValue("property3");
    doSomethingWithProperty(msg);
    }

    if (itemUpdate.isValueChanged("property2") && clientNeedsProperty2) {
    var msg = itemUpdate.getNewValue("property3");
    doSomethingWithProperty(msg);
    }
    if (itemUpdate.isValueChanged("property3") && clientNeedsProperty3) {
    var msg = itemUpdate.getNewValue("property3");
    doSomethingWithProperty(msg);
    }

    if (itemUpdate.isValueChanged("property4") && clientNeedsProperty4) {
    var msg = itemUpdate.getNewValue("property4");
    doSomethingWithProperty(msg);
    }

    So clientNeedsProperty* are boolean variables, that defines whether the client needs this properties or not. I do not want to send data on the client if not necessary. I can add the conditional properties as items and use subscribe/unsubscribe (as i told you before) or i can use the above code. Is there any difference? What exactly does isValueChanged do and is it heavier to have theese conditional properties? Are their values send to the client or only a flag that tells whether the property has new value, if so is this heavy operation?

    Merci beaucoup!

  5. #5
    Administrator
    Join Date
    Jul 2006
    Location
    Milan
    Posts
    975
    You get property3 in 3 cases out of 4. I assume it's a typo.

    If only some field values change, unchanged field values are not resent to the client.
    Hence, if unneeded fields don't change very much, leaving them in the subscription schema only adds a negligible overhead.

    On the other hand, if unneeded fields change a lot, you may prefer the resubscription.
    I can't understand what you mean by "I can add the conditional properties as items";
    in my view, you could work with one item and use different subscription schemas, according with the needed fields.

  6. #6
    Senior Member
    Join Date
    Jul 2009
    Location
    sofia
    Posts
    40
    You are totally right. Up to now i am changing the group, when i should change the schema.....
    Thanks for the answers.

 

 

Similar Threads

  1. Replies: 1
    Last Post: July 15th, 2011, 12:08 PM
  2. How to config 2 adapter run on 01 server?
    By tuongkha in forum Adapter APIs
    Replies: 2
    Last Post: March 18th, 2008, 10:11 AM
  3. Alpha Theory has deployed Lightstreamer
    By Alessandro in forum General
    Replies: 0
    Last Post: January 25th, 2008, 11:19 AM

Bookmarks

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •  
All times are GMT +1. The time now is 01:35 PM.