Continuous Queries

The C++ and .NET clients can initiate queries that run on the GemFire cache server and notify the client when the query results have changed. For details on the server-side setup for continuous queries, see How Continuous Querying Works in the GemFire User Guide.

Continuous Query Basics

Continuous querying provides the following features:

  • Standard GemFire native client query syntax and semantics. Continuous queries are expressed in the same language used for other native client queries. See Remote Queries.

  • Standard GemFire events-based management of CQ events. The event handling used to process CQ events is based on the standard GemFire event handling framework.

  • Complete integration with the client/server architecture. CQ functionality uses existing server-to-client messaging mechanisms to send events. All tuning of your server-to-client messaging also tunes the messaging of your CQ events. If your system is configured for high availability then your CQs are highly available, with seamless failover provided in case of server failure (see High Availability for Client-to-Server Communication). If your clients are durable, you can also define any of your CQs as durable (see Durable Client Messaging).

  • Interest criteria based on data values. Continuous queries are run against the region’s entry values. Compare this to register interest by reviewing Registering Interest for Entries.

  • Active query execution. Once initialized, the queries operate on new events. Events that change the query result are sent to the client immediately.

Typical Continuous Query Lifecycle

  1. The client creates the CQ. This sets up everything for running the query and provides the client with a CqQuery object, but does not execute the CQ. At this point, the query is in a STOPPEDstate, ready to be closed or run.
  2. The client initiates the CQ with an API call to one of the CqQuery execute* methods. This puts the query into a RUNNING state on the client and on the server. The server remotely evaluates the query string, and optionally returns the results to the client. CqQuery execute* methods include:

    • .NET: CqQuery.Execute() and CqQuery.ExecuteWithInitialResults()
    • C++: CqQuery.execute() and CqQuery.executeWithInitialResults()
  3. A CQ Listener waits for events. When it receives events, it takes action accordingly with the data in the CqEvent.

  4. The CQ is closed by a client call to CqQuery.close. This de-allocates all resources in use for the CQ on the client and server. At this point, the cycle could begin again with the creation of a new CqQuery instance.

Executing a Continuous Query from the Client

The essential steps to create and execute a continuous query are:

  1. Create an instance of the QueryService class. If you are using the pool API (recommended), you should obtain the QueryService from the pool.
  2. Define a CQ Listener (in .NET, an ICqListener, in C++, a CqListener) to field events sent from the server.
  3. Use one of the CqQuery execute* methods to submit the query string to the cache server.
  4. The server remotely evaluates the query string, then monitors those results and notifies the client if they change.
  5. The client listens for changes that match the query predicate.
  6. Iterate through the returned objects.
  7. When finished, close down the continuous query.

.NET Continuous Query Example

These C# code excerpts are from the examples\dotnet\continuousquery example included in your client distribution. See the example for full context.

Following the steps listed above,

  1. Create a query service:

    var queryService = pool.GetQueryService();
    
  2. Define an ICqListener:

    public class MyCqListener<TKey, TResult> : ICqListener<TKey, TResult>
    {
    
  3. Create an instance of your ICqListener and insert it into a CQ attributes object:

    var cqListener = new MyCqListener<string, Order>();
    var cqAttributesFactory = new CqAttributesFactory<string, Order>();
    cqAttributesFactory.AddCqListener(cqListener);
    var cqAttributes = cqAttributesFactory.Create();
    
  4. Create a Continuous Query using the query service and the CQ attributes:

    var query = queryService.NewCq("MyCq", "SELECT * FROM /example_orderobject WHERE quantity > 30", cqAttributes, false);
    
  5. Execute the query:

    query.Execute();
    
  6. Wait for events and do something with them.

    /* Excerpt from the CqListener */
    
    /* Determine Operation Type */
           switch (ev.getQueryOperation())
            {
                case CqOperation.OP_TYPE_CREATE:
                    operationType = "CREATE";
                    break;
                case CqOperation.OP_TYPE_UPDATE:
                    operationType = "UPDATE";
                    break;
                case CqOperation.OP_TYPE_DESTROY:
                    operationType = "DESTROY";
                    break;
                default:
                    break;
            }
    
    ...
    
    /* Take action based on OP Type */
    
    
  7. When finished, close up shop.

    query.Execute();
    ... (respond to events as they arrive)
    
    query.Stop();
    query.Close();
    
    cache.Close();
    

C++ Continuous Query Example

These C++ code excerpts are from the examples/cpp/continuousquery example included in your client distribution. See the example for full context.

Following the steps listed above,

  1. Create a query service:

    auto queryService = pool->getQueryService();
    
  2. Define a CqListener:

    class MyCqListener : public CqListener {
    
  3. Create an instance of your CqListener and insert it into a CQ attributes object:

    CqAttributesFactory cqFactory;
    
    auto cqListener = std::make_shared<MyCqListener>();
    
    cqFactory.addCqListener(cqListener);
    auto cqAttributes = cqFactory.create();
    
  4. Create a Continuous Query using the query service and the CQ attributes:

      auto query = queryService->newCq(
      "MyCq", "SELECT * FROM /custom_orders c WHERE c.quantity > 30",
      cqAttributes);
    
  5. Execute the query:

    query->execute();
    
  6. Wait for events and do something with them.

    /* Excerpt from the CqListener */
    
    /* Determine Operation Type */
    switch (cqEvent.getQueryOperation()) {
    case CqOperation::OP_TYPE_CREATE:
      opStr = "CREATE";
      break;
    case CqOperation::OP_TYPE_UPDATE:
      opStr = "UPDATE";
      break;
    case CqOperation::OP_TYPE_DESTROY:
      opStr = "DESTROY";
      break;
    default:
      break;
    }
    
    ...
    
    /* Take action based on OP Type */
    
    
  7. When finished, close up shop.

    query->execute();
    
    ... (respond to events as they arrive)
    
    query->stop();
    query->close();
    
    cache.close();