Testing a shared file system for compatibility with IBM WebSphere MQ Multi-instance Queue Managers - Middleware News
The WebSphere MQ technote Testing and support statement for WebSphere MQ multi-instance queue managers
documents the requirements on a shared file system for it to be
supported by WebSphere MQ multi-instance queue managers. It also
documents the shared file systems that IBM has already tested with. This
technote follows on by describing how to focus testing of a file system
that was not listed in the previous technote, to give best confidence
that the system will keep integrity and work successfully.
WebSphere MQ V7.0.1 introduces multi-instance
queue managers. For this you will need a shared file system on networked
storage, such as a NAS, or a cluster file system such as IBM's General
Parallel File System (GPFS).
Shared file systems must provide data write integrity, guaranteed exclusive access to files, and release locks on failure to work reliably with WebSphere MQ. Further details are described in the technote Testing and support statement for WebSphere MQ multi-instance queue managers.
Shared file systems must provide data write integrity, guaranteed exclusive access to files, and release locks on failure to work reliably with WebSphere MQ. Further details are described in the technote Testing and support statement for WebSphere MQ multi-instance queue managers.
To gain confidence that a file system that is
unlisted in the previous technote will work successfully with WebSphere
MQ multi-instance queue managers, focus your testing as follows:
- Run amqmfsck with each option.
- Run the integrity checker application that puts and gets persistent messages under syncpoint while failing over the queue manager and ensuring that no messages get lost or corrupted. The source for this application is included at the end of this technote.
- Check that error messages are correctly written to the error logs during failover and delete the multi-instance queue manager afterwards.
These tests are described in more detail below.
Actions that cause a failover
Failover of a multi-instance queue manager can be
triggered by hardware or software failures, including networking
problems which prevent the queue manager writing to its data or log
files. To be confident that a shared file system will provide integrity
and work with a multi-instance queue manager when such a problem occurs
unexpectedly, test all possible failure scenarios. A list of actions
that would cause a failover includes:
- Shutting down the operating system including syncing the disks
- Halting the operating system without syncing the disks
- Physically pressing the server's reset button
- Physically pulling the network cable out of the server
- Physically pulling the power cable out of the server
- Physically switching the machine off
Perform these tests with each of these actions that cause a failover, and any other failover actions that are appropriate to your environment or system.
Run amqmfsck with each option
On distributed platforms other than Windows,
WebSphere MQ provides a tool called amqmfsck which can be used when
testing whether a file system is suitable for use with a multi-instance
queue manager. amqmfsck must be run on the shared file system with each
of the following options. In each case replace
with the location that the shared file system is mounted. For more
information about running amqmfsck, see the WebSphere MQ information center
1. Testing the basic file system behavior: no parameters
On one system run:
amqmfsck
2. Testing writing to a file concurrently: -c
Run the program:
amqmfsck -c
on both machines at the same time.
3. Testing waiting for and releasing file locks: -w
Run the program:
amqmfsck -w
on both machines at the same time.
Running the integrity checker application during failover
1. Create a multi-instance queue manager, qm1,
specifying the queue manager's data directory and log directory on the
shared file system to be tested. For example on one machine:
crtmqm -ld /shared/logs -md /shared/data qm1
On the other machine:
addmqinf -sQueueManager -vName=qm1 -vDirectory=qm1 -vPrefix=/var/mqm -vDataPath=/shared/data/qm1
Replace /shared with the name of your shared file system to be tested and qm1 with your preferred queue manager name.
2. Start an active and a standby instance by running on each machine:
strmqm -x qm1
3. Switchover the queue manager by running on the active instance:
endmqm -is qm1
4. Check that the active instance shuts down cleanly and the standby instance becomes active by running on both machines:
dspmq -x
5. Build and link the integrity checker WebSphere
MQ client application, amqhafst, that is included at the end of this
technote. The application needs to be linked against the MQI C client
library. This application puts persistent messages under syncpoint to a
queue and then gets them back again, also under syncpoint. It
automatically reconnects to the queue manager following the failover. It
checks each message payload for corruption. It also checks that it gets
back the same number of messages that it put. This application
tolerates a transaction being backed out when the queue manager fails
over. It also uses a side queue to work out whether
MQRC_CALL_INTERRUPTED returned from a MQCMIT resulted in the unit of
work being committed or backed out.
6. Define and start a listener. One way of doing this is to define the listener under queue manager control so that it will start when a standby instance becomes active:
echo "define listener(list1) trptype(tcp) port(1414) control(qmgr)" | runmqsc qm1
echo "start listener(list1)" | runmqsc qm1
7. Create the application's queues. This application needs two queues - one to put and get messages and the other for the side message. Create these using:
echo "define ql(q)" | runmqsc qm1
echo "define ql(sideq)" | runmqsc qm1
8. Set the MQSERVER environment variable to point to the active and standby instances of the queue manager, enabling the application to connect and automatically reconnect.
export MQSERVER=SYSTEM.DEF.SVRCONN/TCP/'
Replace
9. Run the application whilst failing over the queue manager. The application is invoked with these parameters:
amqhafst qm1 q sideq
where:
- qm1 is the queue manager name
- q is the name of the queue to put and get messages
- sideq is the name of the queue to put and get the side message
is the number of messages to put or get inside a single transaction is the number of times the messages will be put and got is set to 0 (meaning off), 1 (meaning verbose) or 2 (meaning very verbose)
Run the applications with values of uowsize and iterations that are sufficient to failover the queue manager while the application is still running. Then failover the queue manager while the application is running and ensure that the application completes successfully. Ensure that the queue manager has failed over successfully by running dspmq -x. A successful failover will have stopped the active instance and made the standby instance active.
10. After the failover is complete and the application has finished, check that there are no messages left on any of the queues using:
echo "display ql(q) curdepth" | runmqsc qm1
echo "display ql(sideq) curdepth" | runmqsc qm1
11. Repeat this test failing over the queue manager using each of the failover actions described at the top of this technote several times. Failing over the queue manager by pulling out the network cable should be performed at least 5 times. The application should complete successfully after each failover and there should be no messages left on either queue. The queue manager instance that was killed to initiate the failover might produce FFDCs when it notices the problem which triggers the failover - these are expected and can be ignored. The standby instance that becomes active should not produce any FFDCs, except for the following probes:
- AO074001
- KN673000
FFDCs with these probe ids that are generated on the standby instance that becomes active are informational and can be safely ignored.
Check the error logs and delete the queue manager
1. Change directory to the queue manager's error logs directory on the shared file system. Check that all three queue manager error logs have been written to and they all have the correct ownerships and permissions:
cd /shared/data/qm1/errors
ls -l
This should show something like:
-rw-rw---- 1 mqm mqm 19634 13 Sep 2010 AMQERR01.LOG
-rw-rw---- 1 mqm mqm 262355 13 Sep 2010 AMQERR02.LOG
-rw-rw---- 1 mqm mqm 262174 13 Sep 15:43 AMQERR03.LOG
2. Finally delete the queue manager which should complete successfully:
dltmqm qm1
Integrity checker application
/************************************************************/
/* */
/* Program name: AMQHAFST */
/* */
/* Description: Example C client program that tests a */
/* shared file system's suitability for use */
/* with WebSphere MQ multi-instance queue */
/* managers. */
/* */
/*
/* Licensed Materials - Property of IBM */
/* */
/* 5724-H72 */
/* (c) Copyright IBM Corp. 1994, 2009 All Rights Reserved. */
/* */
/* US Government Users Restricted Rights - Use, duplication */
/* or disclosure restricted by GSA ADP Schedule Contract */
/* with IBM Corp. */
/*
/* */
/************************************************************/
/* */
/* Function: */
/* */
/* AMQHAFST is an example C program that puts and gets */
/* persistent messages to a queue under syncpoint. It */
/* checks the number of messages it got back is the same as */
/* the number it put. It is designed to test a shared */
/* file system for use with WebSphere MQ multi-instance */
/* queue managers. When run against a multi-instance queue */
/* manager, and that queue manager is failed over whilst */
/* this program is running, this program will automatically */
/* reconnect to the new instance. This program will check */
/* for message corruption and missing messages which would */
/* point to problems with the shared file system. */
/* */
/* This program: */
/* */
/* - switches on the autoreconnect option when connecting */
/* to the queue manager so that it will automatically */
/* reconnect when the queue manager fails over. */
/* */
/* - puts and gets a batch of messages under syncpoint. */
/* */
/* - repeats putting and getting the messages over a */
/* variable number of iterations. */
/* */
/* - tolerates a unit of work being rolled back, which */
/* might happen when the queue manager fails over. */
/* */
/* - maps a reason of MQRC_CALL_INTERRUPTED to either */
/* committed or rolled back when returned from a commit, */
/* which might happen during failover. This is done by */
/* updating a separate message to a side queue inside */
/* the original transaction. If MQRC_CALL_INTERRUPTED */
/* is returned, the message put to the side queue can be */
/* browsed to see if was updated by the previous */
/* transaction or not. */
/* */
/************************************************************/
/* */
/* AMQHAFST has the following parameters */
/* */
/* the queue manager name */
/* the name of the queue to put and get messages */
/* the name of the side queue to put and get the side */
/* message */
/* the number of messages to put or get inside a single */
/* transaction */
/* the number of times the messages will be put and got */
/* either 0 (meaning not verbose), 1 (meaning verbose) or */
/* 2 (meaning very verbose) */
/* */
/************************************************************/
#include
#include
#include
#include
#define TRUE 1
#define FALSE 0
/* MESSAGELEN is the length of each message. */
#define MESSAGELEN 5000
/* Constants used with the verbose global variable that */
/* specify how much debug debug to output. */
#define NOTVERBOSE 0
#define VERBOSE 1
#define VERYVERBOSE 2
static int verbose = NOTVERBOSE;
static int hadError = FALSE;
/*************************************************************/
/* */
/* Function checkReason */
/* Checks a reason code for errors */
/* */
/* Parameters */
/* name (input) the MQI call that was executed */
/* reason (input) the reason to be checked */
/* Returns */
/* void */
/* Outputs */
/* An error message is output for unexpected reasons */
/* */
/*************************************************************/
static void checkReason(char *name, MQLONG rc)
{
int ignore = FALSE;
/***********************************************************/
/* MQRC_BACKED_OUT is not an error because it can be */
/* returned from any put, get or commit when the queue */
/* manager fails over. */
/***********************************************************/
if (MQRC_BACKED_OUT == rc) ignore = TRUE;
/***********************************************************/
/* MQRC_CALL_INTERRUPTED is not an error when returned */
/* from MQCMIT because it can be returned when the queue */
/* manager fails over. */
/***********************************************************/
if (!strcmp(name,"MQCMIT") && (MQRC_CALL_INTERRUPTED == rc))
ignore = TRUE;
/***********************************************************/
/* Output an error message for bad reasons, or output an */
/* informational message is verbose. */
/***********************************************************/
if (MQRC_NONE != rc && !ignore)
{
printf("ERROR %s rc = %ld\n", name, rc);
hadError = TRUE;
}
else if (verbose)
printf("%s rc = %ld\n", name, rc);
}
/*************************************************************/
/* */
/* Function handleBackedOut */
/* An MQI call might back out. If so it will return */
/* MQRC_BACKED_OUT. This can happen when the queue */
/* manager fails over. The MQI specifies that MQBACK */
/* must be called when a previous MQI call backs out. */
/* This function calls MQBACK in this situation. */
/* */
/* Parameters */
/* text (input) information text is identify what was */
/* backed out */
/* hconn (input) connection handle */
/* rc (input) reason code returned from previous MQI */
/* call */
/* Returns */
/* TRUE if the previous MQI call backed out */
/* FALSE if the previous MQI call did not back out */
/* Outputs */
/* An informational message is a rollback occurred */
/* */
/*************************************************************/
int handleBackedOut(char *text, MQHCONN hconn, MQLONG rc)
{
MQLONG cc1;
MQLONG rc1;
if (MQRC_BACKED_OUT == rc)
{
printf("%s message was backed out\n", text);
MQBACK(hconn, &cc1, &rc1);
checkReason("MQBACK", rc1);
return TRUE;
}
return FALSE;
}
/*************************************************************/
/* struct SideInfo is the data about the side message. The */
/* side message is written to the side queue. The side */
/* message is updated inside every transaction so that if */
/* the transaction commit returns MQRC_CALL_INTERRUPTED, the */
/* side message can be browsed to determine whether the */
/* transaction committed or rolled back. There is exactly */
/* one side message written to the side queue, and this */
/* message is got by message id when it is to be updated. */
/* The transaction id in the side message is incremented */
/* when the side message is updated. The latest transaction */
/* id is kept in this side information in a local variable. */
/* After a transaction has returned call interrupted, if the */
/* local transaction id matches the transaction id in the */
/* side message, the transaction committed. If they do not */
/* match, the transaction rolled back. */
/*************************************************************/
struct SideInfo
{
/* Object handle used to access the side queue */
MQHOBJ hobj;
long tranid;
MQBYTE24 msgid;
};
/*************************************************************/
/* */
/* Function initSide */
/* Initialises the side message. Open the side queue and */
/* put the side message with a transaction id of zero. */
/* */
/* Parameters */
/* hconn (input) connection handle */
/* sidename (input) name of the side queue */
/* sideinfo (output) the side queue information */
/* Returns */
/* void */
/* */
/*************************************************************/
static void initSide(MQHCONN hconn,
char *sidename,
struct SideInfo *sideinfo)
{
MQOD od = {MQOD_DEFAULT}; /* side q object descriptor*/
MQLONG cc1;
MQLONG rc1;
MQMD md = {MQMD_DEFAULT}; /* side msg descriptor */
MQPMO pmo = {MQPMO_DEFAULT}; /* put message options */
char buffer[sizeof(long)]; /* side message buffer */
/***********************************************************/
/* Initialise the side information */
/***********************************************************/
memset(sideinfo,0,sizeof(struct SideInfo));
/***********************************************************/
/* Open the side queue for putting and getting messages */
/***********************************************************/
strcpy( od.ObjectName, sidename );
MQOPEN( hconn, &od,
MQOO_FAIL_IF_QUIESCING | MQOO_INPUT_SHARED |
MQOO_OUTPUT | MQOO_BROWSE,
&(sideinfo->hobj), &cc1, &rc1 );
checkReason("MQOPEN side", rc1);
/***********************************************************/
/* No need to put the side message under syncpoint here */
/* because we don't expect a failover until the */
/* application has got going. */
/***********************************************************/
pmo.Options = MQPMO_NEW_MSG_ID
| MQPMO_NEW_CORREL_ID
| MQPMO_FAIL_IF_QUIESCING
| MQPMO_NO_SYNCPOINT;
/***********************************************************/
/* Initialise the transaction id in the side message */
/***********************************************************/
sideinfo->tranid = 0;
memcpy(buffer, &(sideinfo->tranid), sizeof(long));
/***********************************************************/
/* The side message must be persistent to survive a */
/* failover */
/***********************************************************/
md.Persistence = MQPER_PERSISTENT;
if (verbose) printf("MQPUT tranid=%d\n", sideinfo->tranid);
MQPUT( hconn,
sideinfo->hobj,
&md,
&pmo,
sizeof(long),
buffer,
&cc1,
&rc1 );
checkReason("MQPUT side", rc1);
/***********************************************************/
/* Remember the id of the side message so that it can be */
/* got by message id later. */
/***********************************************************/
memcpy(sideinfo->msgid, md.MsgId, sizeof(MQBYTE24));
}
/*************************************************************/
/* */
/* Function termSide */
/* Terminates the side message. Removes the side message */
/* and closes the side queue. */
/* */
/* Parameters */
/* hconn (input) connection handle */
/* sideinfo (input) the side queue information */
/* Returns */
/* void */
/* */
/*************************************************************/
static void termSide(MQHCONN hconn, struct SideInfo *sideinfo)
{
MQLONG cc1;
MQLONG rc1;
char buffer[sizeof(long)]; /* side message buffer */
MQMD md = {MQMD_DEFAULT}; /* side msg descriptor */
MQGMO gmo = {MQGMO_DEFAULT}; /* get message options */
MQLONG msglen = 0; /* side message length */
long tranid = 0; /* transaction id */
/***********************************************************/
/* Get the side message destructively by message id. No */
/* need to use syncpoint since we don't expect the queue */
/* manager to fail over during application termination. */
/***********************************************************/
memcpy(md.MsgId, sideinfo->msgid, sizeof(MQBYTE24));
gmo.MatchOptions = MQMO_MATCH_MSG_ID;
gmo.Options = MQGMO_FAIL_IF_QUIESCING |
MQGMO_NO_WAIT | MQGMO_NO_SYNCPOINT;
MQGET( hconn,
sideinfo->hobj,
&md,
&gmo,
sizeof(long),
buffer,
&msglen,
&cc1,
&rc1 );
checkReason("MQGET side", rc1);
/***********************************************************/
/* If verbose, output the final transaction id in an */
/* informational message. */
/***********************************************************/
memcpy(&tranid, buffer, sizeof(long));
if (verbose) printf("MQGET side tranid=%d\n", tranid);
/***********************************************************/
/* Close the side queue */
/***********************************************************/
MQCLOSE( hconn, &sideinfo->hobj, MQCO_NONE, &cc1, &rc1);
checkReason("MQCLOSE side", rc1);
}
/*************************************************************/
/* */
/* Function didItBackout */
/* Discovers whether the previous transaction committed */
/* or rolled back by inquiring the side message. */
/* */
/* Parameters */
/* hconn (input) connection handle */
/* sideinfo (input) the side queue information */
/* Returns */
/* TRUE if the previous transaction rolled back */
/* FALSE if the previous transaction committed */
/* Outputs */
/* An informational message that indicates whether the */
/* call interrupted resolves to a commit or a roll back. */
/*************************************************************/
static int didItBackout(MQHCONN hconn,
struct SideInfo *sideinfo)
{
MQLONG cc1;
MQLONG rc1;
char buffer[sizeof(long)]; /* side message buffer */
MQMD md = {MQMD_DEFAULT}; /* side message descriptor */
MQGMO gmo = {MQGMO_DEFAULT}; /* get message options */
MQLONG msglen = 0; /* side message length */
long tranid = 0; /* transaction id */
printf("Resolving call interrupted\n");
/***********************************************************/
/* Browse the side message by message id */
/***********************************************************/
gmo.Options = MQGMO_FAIL_IF_QUIESCING | MQGMO_BROWSE_FIRST |
MQGMO_NO_WAIT;
if (gmo.Version < MQGMO_VERSION_2)
gmo.Version = MQGMO_VERSION_2;
gmo.MatchOptions = MQMO_MATCH_MSG_ID;
memcpy(md.MsgId, sideinfo->msgid, sizeof(MQBYTE24));
MQGET( hconn,
sideinfo->hobj,
&md,
&gmo,
sizeof(long),
buffer,
&msglen,
&cc1,
&rc1 );
checkReason("MQGET browse side", rc1);
/***********************************************************/
/* Compare the transaction id in the side message with the */
/* transaction id in the side information. If they are the */
/* same the transaction committed. If they are different, */
/* the transaction rolled back. */
/***********************************************************/
memcpy(&tranid, buffer, sizeof(long));
printf( "MQGET browse side tranid=%d sideinfo->tranid=%d\n",
tranid,
sideinfo->tranid);
if (tranid != sideinfo->tranid)
{
printf("Resolving to backed out\n");
return TRUE;
}
else
{
printf("Resolving to committed\n");
return FALSE;
}
}
/*************************************************************/
/* */
/* Function commit */
/* Commits a transaction. This function also updates the */
/* side message in the unit of work, and handles reason */
/* codes returned from the commit. */
/* */
/* Parameters */
/* hconn (input) connection handle */
/* sideinfo (input) the side queue information */
/* Returns */
/* TRUE if the previous transaction rolled back */
/* FALSE if the previous transaction committed */
/* Outputs */
/* An informational message that indicates whether the */
/* call interrupted resolves to a commit or a roll back. */
/* */
/*************************************************************/
static int commit(MQHCONN hconn, struct SideInfo *sideinfo)
{
MQLONG cc1;
MQLONG rc1;
int backedout = FALSE; /* return value */
char buffer[sizeof(long)]; /* side message buffer */
MQMD md = {MQMD_DEFAULT}; /* side message descriptor */
MQGMO gmo = {MQGMO_DEFAULT}; /* get message options */
MQLONG msglen = 0; /* side message length */
MQPMO pmo = {MQPMO_DEFAULT}; /* put message options */
long tranid = 0; /* transaction id */
/***********************************************************/
/* Get the side message in the caller's unit of work. get */
/* the message by message id */
/***********************************************************/
memcpy(md.MsgId, sideinfo->msgid, sizeof(MQBYTE24));
gmo.MatchOptions = MQMO_MATCH_MSG_ID;
gmo.Options = MQGMO_FAIL_IF_QUIESCING |
MQGMO_NO_WAIT |
MQGMO_SYNCPOINT;
MQGET( hconn,
sideinfo->hobj,
&md,
&gmo,
sizeof(long),
buffer,
&msglen,
&cc1,
&rc1 );
checkReason("MQGET side", rc1);
memcpy(&tranid, buffer, sizeof(long));
if (verbose) printf("MQGET side tranid=%d\n", tranid);
/***********************************************************/
/* Increment the transaction id and put the side message */
/* also under the caller's unit of work. This is done so */
/* if the caller's transaction backs out, the update to */
/* the side message will also back out. */
/***********************************************************/
sideinfo->tranid = sideinfo->tranid + 1;
if (verbose)
printf("MQPUT side tranid=%d\n", sideinfo->tranid);
memcpy(buffer, &(sideinfo->tranid), sizeof(long));
pmo.Options = MQPMO_FAIL_IF_QUIESCING | MQPMO_SYNCPOINT;
MQPUT( hconn,
sideinfo->hobj,
&md,
&pmo,
sizeof(long),
buffer,
&cc1,
&rc1 );
checkReason("MQPUT side", rc1);
/***********************************************************/
/* Commit the transaction */
/***********************************************************/
MQCMIT(hconn, &cc1, &rc1);
checkReason("MQCMIT", rc1);
/***********************************************************/
/* Call interrupted means that the transaction either */
/* committed or backed out but we don't know which. Use */
/* the side message here to find out what happened. */
/***********************************************************/
if (MQRC_CALL_INTERRUPTED == rc1 &&
didItBackout(hconn, sideinfo))
rc1 = MQRC_BACKED_OUT;
/***********************************************************/
/* Call handleBackedOut here to call MQBACK, as the MQI */
/* requires. */
/***********************************************************/
backedout = handleBackedOut("Commit", hconn, rc1);
return backedout;
}
/*************************************************************/
/* */
/* Main program */
/* */
/*************************************************************/
int main( int argc, char** argv )
{
MQOD od = {MQOD_DEFAULT}; /* object descriptor */
MQMD md = {MQMD_DEFAULT}; /* message descriptor */
MQPMO pmo = {MQPMO_DEFAULT}; /* put message options */
MQGMO gmo = {MQGMO_DEFAULT}; /* get message options */
MQHCONN hconn; /* connection handle */
MQHOBJ hobj; /* queue handle */
MQCNO connectopts = {MQCNO_DEFAULT}; /* connection opts */
MQLONG cc;
MQLONG rc;
MQLONG msglen; /* message length */
char buffer[MESSAGELEN+1]; /* message buffer */
char qmname[MQ_Q_MGR_NAME_LENGTH]; /* q manager name */
char qname[MQ_Q_NAME_LENGTH]; /* queue name */
char sidename[MQ_Q_NAME_LENGTH]; /* side queue name */
int uowsize; /* unit of work size */
int iterations; /* number of iterations */
MQLONG buflen = 0; /* message buffer length */
char alphabet[]="abcdefghijklmnopqrstuvwxyz";
char msgdata[MESSAGELEN+1]; /* message payload */
int nmsg; /* message counter */
int it; /* iteration counter */
int backedout = FALSE; /* rollback flag */
struct SideInfo sideinfo; /* side queue information */
/***********************************************************/
/* Output a hello message */
/***********************************************************/
printf("fstest\n");
/***********************************************************/
/* Copy the command line arguments into local variables */
/* and output them as a check */
/***********************************************************/
strncpy(qmname, argv[1], (size_t)MQ_Q_MGR_NAME_LENGTH);
strncpy(qname, argv[2], (size_t)MQ_Q_NAME_LENGTH);
strncpy(sidename, argv[3], (size_t)MQ_Q_NAME_LENGTH);
uowsize = atoi(argv[4]);
iterations = atoi(argv[5]);
verbose = atoi(argv[6]);
printf("qmname = %s\n", qmname);
printf("qname = %s\n", qname);
printf("sidename = %s\n", sidename);
printf("uowsize = %d\n", uowsize);
printf("iterations = %d\n", iterations);
printf("verbose = %d\n", verbose);
/***********************************************************/
/* Every message put to the queue is the same length and */
/* contains the same information. Initialise a copy of the */
/* message payload up front here so we can copy it into */
/* each message and compare it with each message we get */
/* back. */
/***********************************************************/
memset(buffer,0,MESSAGELEN+1);
memset(msgdata,0,MESSAGELEN+1);
for (nmsg=0; nmsg
msgdata[nmsg] = alphabet[nmsg%strlen(alphabet)];
/***********************************************************/
/* Connect to the queue manager specifying automatic */
/* client reconnection. */
/***********************************************************/
connectopts.Version = MQCNO_VERSION_5;
connectopts.Options |= MQCNO_RECONNECT;
MQCONNX(qmname, &connectopts, &hconn, &cc, &rc );
checkReason("MQCONNX", rc);
/***********************************************************/
/* Initialise the side message here */
/***********************************************************/
initSide(hconn, sidename, &sideinfo);
/***********************************************************/
/* Open the queue that we will put and get messages to */
/***********************************************************/
strcpy( od.ObjectName, qname );
MQOPEN( hconn,
&od,
MQOO_FAIL_IF_QUIESCING | MQOO_INPUT_EXCLUSIVE |
MQOO_OUTPUT,
&hobj,
&cc,
&rc );
checkReason("MQOPEN", rc);
/***********************************************************/
/* Loop round iterations times putting and getting a batch */
/* of messages to the queue */
/***********************************************************/
for (it=0; it
{
printf("Iteration %d\n", it);
backedout = FALSE;
/*********************************************************/
/* Put a batch of persistent messages to the queue under */
/* syncpoint. If a rollback happens in the middle of the */
/* batch, stop putting messages because the whole batch */
/* will be ignored. */
/*********************************************************/
for (nmsg = 0;
nmsg
nmsg++)
{
if (verbose) printf("Put Message %d\n", nmsg);
msglen = MESSAGELEN;
memcpy(buffer, msgdata, MESSAGELEN);
if (VERYVERBOSE==verbose)
printf("MQPUT msglen=%d buffer = %s\n",
msglen,
buffer);
pmo.Options |= MQPMO_FAIL_IF_QUIESCING;
md.Persistence = MQPER_PERSISTENT;
pmo.Options |= MQPMO_SYNCPOINT;
MQPUT( hconn,
hobj,
&md,
&pmo,
msglen,
buffer,
&cc,
&rc );
checkReason("MQPUT", rc);
/*******************************************************/
/* handleBackedOut will do nothing if the transaction */
/* did not rollback */
/*******************************************************/
backedout = handleBackedOut("Put", hconn, rc);
}
/*********************************************************/
/* If the transaction backed out already, presumably */
/* because the queue manager failed over, ignore the */
/* batch because it should have all been rolled back. */
/* The user should check that there are no messages left */
/* on any of the queues at the end of the test because */
/* that would indicate a partial transaction being */
/* rolled back here. Immediately continue with the next */
/* iteration because there's nothing to commit or get */
/* here. */
/*********************************************************/
if (backedout) continue;
/*********************************************************/
/* Commit the transaction, but if it rolls back continue */
/* with the next iteration because there'll be nothing */
/* to get. */
/*********************************************************/
backedout = commit(hconn, &sideinfo);
if (backedout) continue;
/*********************************************************/
/* Get the batch of messages from the queue under */
/* syncpoint. If a rollback happens in the middle of the */
/* batch, get all the messages in a new transaction, */
/* otherwise they'll be messages left on the queue at */
/* the end of the test. */
/*********************************************************/
do
{
backedout = FALSE;
for (nmsg = 0;
nmsg
nmsg++)
{
if (verbose) printf("Get Message %d\n", nmsg);
gmo.Options = MQGMO_FAIL_IF_QUIESCING |
MQGMO_NO_WAIT | MQGMO_SYNCPOINT;
memset(buffer,0,MESSAGELEN);
MQGET( hconn,
hobj,
&md,
&gmo,
MESSAGELEN,
buffer,
&msglen,
&cc,
&rc );
checkReason("MQGET", rc);
backedout = handleBackedOut("Get", hconn, rc);
/*****************************************************/
/* If this message did not back out check that the */
/* length and payload of the message is correct and */
/* that no corruption has occurred that might imply */
/* a file system compliance issue. */
/*****************************************************/
if (!backedout)
{
if (verbose)
printf( "MQGET msglen = %d strlen(buffer) = %d\n",
msglen,
strlen(buffer));
if (VERYVERBOSE==verbose)
printf("MQGET buffer = %s\n", buffer);
if (msglen != MESSAGELEN)
printf( "ERROR bad msglen %d expected %d\n",
msglen,
MESSAGELEN);
if (memcmp(buffer,msgdata,MESSAGELEN))
{
printf("ERROR bad buffer msglen = %d\n", msglen);
printf("Expected %s\n", msgdata);
printf("Received %s\n", buffer);
}
}
} /* end for each message in batch */
/*******************************************************/
/* Only commit if there's something to commit */
/*******************************************************/
if (!backedout)
backedout = commit(hconn, &sideinfo);
}
while (backedout && !hadError);
} /* end for each iteration */
/***********************************************************/
/* Close the queue */
/***********************************************************/
MQCLOSE( hconn, &hobj, MQCNO_NONE, &cc, &rc);
checkReason("MQCLOSE", rc);
/***********************************************************/
/* Remove the side message and close the side queue. */
/***********************************************************/
termSide(hconn, &sideinfo);
/***********************************************************/
/* Disconnect from the queue manager */
/***********************************************************/
MQDISC( &hconn, &cc, &rc);
checkReason("MQDISC", rc);
/***********************************************************/
/* Exit */
/***********************************************************/
printf("Finished\n");
return 0;
Comments
Post a Comment