Writing the CQ Listener or CQ Status Listener
The following C++ and C# .NET examples show how you might program a simple CqListener
or CqStatusListener
to update a display screen based on the CQ events it receives.
The listener retrieves the queryOperation
and entry key and value from the CqEvent
, then updates the screen according to the operation type provided in queryOperation
.
CQ events do not change your client cache. They are provided as an event service only. This allows you to have any collection of CQs without storing large amounts of data in your regions. If you need to persist information from CQ events, program your listener to store the information where it makes the most sense for your application.
Be very careful if you choose to update your cache from your CqListener
. If your listener updates the region that is queried in its own CQ, the update may be forwarded to the server. If the update on the server satisfies the same CQ, it may be returned to the same listener that did the update, which could put your application into an infinite loop. This same scenario could be played out with multiple regions and multiple CQs if the listeners are programmed to update each other’s regions.
CqListener Implementation (C++)
// CqListener class
class TradeEventListener : public CqListener {
public:
void onEvent(const CqEvent& cqEvent) {
// Operation associated with the query op
CqOperation::CqOperationType queryOperation = cqEvent.getQueryOperation();
// key and new value from the event
CacheableKeyPtr key = cqEvent.getKey();
TradeOrderPtr tradeOrder = dynCast<TradeOrderPtr>(cqEvent.getNewValue());
if (queryOperation==CqOperation::OP_TYPE_UPDATE) {
// update data on the screen for the trade order
. . .
}
else if (queryOperation==CqOperation::OP_TYPE_CREATE) {
// add the trade order to the screen
. . .
}
else if (queryOperation==CqOperation::OP_TYPE_DESTROY) {
// remove the trade order from the screen
. . .
}
}
void onError(const CqEvent& cqEvent) {
// handle the error
}
void close() {
// close the output screen for the trades
. . .
}
}
CqListener Implementation (C# .NET)
// CqListener class
public class TradeEventListener : ICqListener {
void OnEvent(CqEvent<TKey, TResult>^ ev) {
// Operation associated with the query op
CqOperationType queryOperation = ev.getQueryOperation();
// key and new value from the event
ICacheableKey key = ev.getKey();
CacheableString keyStr = key as CacheableString;
IGeodeSerializable val = ev.getNewValue();
TradeOrder tradeOrder = val as TradeOrder;
if (queryOperation==CqOperationType.OP_TYPE_UPDATE) {
// update data on the screen for the trade order
// . . .
}
else if (queryOperation== CqOperationType.OP_TYPE_CREATE) {
// add the trade order to the screen
// . . .
}
else if (queryOperation== CqOperationType.OP_TYPE_DESTROY) {
// remove the trade order from the screen
// . . .
}
}
public void OnError(CqEvent<TKey, TResult>^ ev) {
// handle the error
}
// From CacheCallback
public void Close() {
// close the output screen for the trades
// . . .
}
}
Writing a CqStatusListener
If you need your CQs to detect whether they are connected to any of the servers that host its subscription queues, implement a CqStatusListener
instead of a CqListener
.
CqStatusListener
extends the current CqListener
, allowing a client to detect when a CQ is connected and/or disconnected from the server(s). The onCqConnected()
method will be invoked when the CQ is connected, and when the CQ has been reconnected after being disconnected. The onCqDisconnected()
method will be invoked when the CQ is no longer connected to any servers.
Taking the examples from above, we can instead implement a CqStatusListener
.
When you install the CqStatusListener
, your listener will be able to detect its connection status to the servers that it is querying.
CqStatusListener Implementation (C++)
class MyCqStatusListener : public CqStatusListener {
uint8_t m_id;
uint32_t m_numInserts;
uint32_t m_numUpdates;
uint32_t m_numDeletes;
uint32_t m_numEvents;
uint32_t m_cqsConnectedCount;
uint32_t m_cqsDisconnectedCount;
public:
uint8_t getId()
{
return m_id;
}
uint32_t getNumInserts()
{
return m_numInserts;
}
uint32_t getNumUpdates()
{
return m_numUpdates;
}
uint32_t getNumDeletes()
{
return m_numDeletes;
}
uint32_t getNumEvents()
{
return m_numEvents;
}
uint32_t getCqsConnectedCount()
{
return m_cqsConnectedCount;
}
uint32_t getCqsDisConnectedCount()
{
return m_cqsDisconnectedCount;
}
MyCqStatusListener(uint8_t id):
m_id(id),
m_numInserts(0),
m_numUpdates(0),
m_numDeletes(0),
m_numEvents(0),
m_cqsDisconnectedCount(0),
m_cqsConnectedCount(0)
{
}
inline void updateCount(const CqEvent& cqEvent)
{
m_numEvents++;
switch (cqEvent.getQueryOperation())
{
case CqOperation::OP_TYPE_CREATE: {
m_numInserts++;
break;
}
case CqOperation::OP_TYPE_UPDATE: {
m_numUpdates++;
break;
}
case CqOperation::OP_TYPE_DESTROY: {
m_numDeletes++;
break;
}
default:
break;
}
}
void onEvent(const CqEvent& cqe){
updateCount(cqe);
}
void onError(const CqEvent& cqe){
updateCount(cqe);
}
void close(){
}
void onCqDisconnected() {
//This is called when the cq loses connection with all servers.
m_cqsDisconnectedCount++;
}
void onCqConnected() {
//This is called when the cq establishes a connection with a server.
m_cqsConnectedCount++;
}
void clear() {
m_numInserts = 0;
m_numUpdates = 0;
m_numDeletes = 0;
m_numEvents = 0;
m_cqsDisconnectedCount = 0;
m_cqsConnectedCount = 0;
}
};
CQStatusListener Implementation (C# .NET)
public class MyCqStatusListener<TKey, TResult> : ICqStatusListener<TKey, TResult>
{
#region Private members
private bool m_failedOver = false;
private UInt32 m_eventCountBefore = 0;
private UInt32 m_errorCountBefore = 0;
private UInt32 m_eventCountAfter = 0;
private UInt32 m_errorCountAfter = 0;
private UInt32 m_CqConnectedCount = 0;
private UInt32 m_CqDisConnectedCount = 0;
#endregion
#region Public accessors
public MyCqStatusListener(int id)
{
}
public void failedOver()
{
m_failedOver = true;
}
public UInt32 getEventCountBefore()
{
return m_eventCountBefore;
}
public UInt32 getErrorCountBefore()
{
return m_errorCountBefore;
}
public UInt32 getEventCountAfter()
{
return m_eventCountAfter;
}
public UInt32 getErrorCountAfter()
{
return m_errorCountAfter;
}
public UInt32 getCqConnectedCount()
{
return m_CqConnectedCount;
}
public UInt32 getCqDisConnectedCount()
{
return m_CqDisConnectedCount;
}
#endregion
public virtual void OnEvent(CqEvent<TKey, TResult> ev)
{
if (m_failedOver == true)
m_eventCountAfter++;
else
m_eventCountBefore++;
}
public virtual void OnError(CqEvent<TKey, TResult> ev)
{
if (m_failedOver == true)
m_errorCountAfter++;
else
m_errorCountBefore++;
}
public virtual void Close()
{
}
public virtual void OnCqConnected()
{
m_CqConnectedCount++;
}
public virtual void OnCqDisconnected()
{
m_CqDisConnectedCount++;
}
public virtual void Clear()
{
m_eventCountBefore = 0;
m_errorCountBefore = 0;
m_eventCountAfter = 0;
m_errorCountAfter = 0;
m_CqConnectedCount = 0;
m_CqDisConnectedCount = 0;
}
}