!======================================================================= ! RCS Header: ! Revision [$Revision: 1.2.2.1 $] Named [$Name: release#2_9_b5 $] ! Last checkin [$Date: 2001/12/06 10:14:21 $] ! Author [$Author: frpb $] !======================================================================= ! *****************************COPYRIGHT******************************* ! (c) CROWN COPYRIGHT 2001, Met Office, All Rights Reserved. ! Please refer to Copyright file in top level GCOM directory ! for further details ! *****************************COPYRIGHT******************************* #include "gcg_prolog.h" SUBROUTINE GCG_RVECSHIFT (LVL, LSL, LSO, NV, SHFT, WRAP, FIELD, & GID, ISTAT) ! ****************************************************************** ! * Purpose: ! * ! * Shift (rotate) the elements in a set of vectors distributed ! * across all members of a group. ! * ! * Input: ! * LVL - Local Vector Length ! * LSL - Local Shift Length (the length of the subsection to ! * be shifted for each vector) ! * LSO - Local Shift Offset (element where the summation ! * start) ! * NV - Number of Vectors ! * SHFT - Number of Shifts to be done ! * WRAP - Logical indicating whether the vectors should be ! * wrapped around on shifts ! * FIELD - local array containing the vectors to be shifted ! * GID - processor group ID ! * ! * Output: ! * FIELD - Local array containing the shifted data ! * ISTAT - status of rsum. 0 is OK (MPI_SRC only), ! * refer to the header files for nonzero status codes ! * ! * NOTES: ! * ! ****************************************************************** IMPLICIT NONE INTEGER LVL, LSL, LSO, NV, SHFT, GID, ISTAT REAL FIELD(LVL,NV) LOGICAL WRAP #if defined(SHM_SRC) INCLUDE 'mpp/shmem.fh' #include "gc__shm_common.h" #include "gc__opts_common.h" #include "gcg__grstore_common.h" #endif #if defined(SHM_SRC) INTEGER INFO, NPROC, SHMEM_SWAP REAL LARR(LVL) POINTER(PTR,LARR) #include "gc__nam_flags_common.h" #endif #if defined(MPI_SRC) INCLUDE 'mpif.h' INTEGER STATUS(MPI_STATUS_SIZE) #endif INCLUDE 'gc_com.h' INTEGER I, J, K, ME, GRANK, GSIZE, GRANK0, IGID, ILOC, GJ #include "gcg__rotate_common.h" ISTAT = GC__OK !--- Return if no shifts IF (SHFT .EQ. 0) RETURN #if defined(SHM_SRC) !--- Verify GID and membership, and find my rank within the group IF (GID .EQ. GCG__ALLGROUP) THEN GRANK = GC_ME() GSIZE = GC_NPROC() GRANK0 = 0 ME = GC_ME() IGID = 0 ELSE IF (GID .GT. GCG__ALLGROUP) THEN DO IGID = 1, MAX_GROUP IF (GR_ID(IGID) .EQ. GID) THEN ILOC = GR_LOC(IGID) ME = GC_ME() DO I = ILOC, ILOC+GR_SZ(IGID)-1 IF (ME .EQ. GR_MEM(I)) THEN GRANK = I - ILOC GOTO 100 ENDIF ENDDO ENDIF ENDDO ISTAT = -1 RETURN 100 CONTINUE GSIZE = GR_SZ(IGID) GRANK0 = GR_MEM(ILOC) ENDIF #endif #if defined(MPI_SRC) IF (GID .EQ. GCG__ALLGROUP) THEN IGID = MPI_COMM_WORLD ELSE IGID = GID ENDIF CALL MPI_COMM_RANK(IGID, GRANK, ISTAT) CALL MPI_COMM_SIZE(IGID, GSIZE, ISTAT) GRANK0 = 0 #endif !--- Check if one or more processors in the group is asked to shift ! more elements than it has LDLEN = LVL - (LSO + LSL - 1) CALL GCG_IMIN(1, GID, ISTAT, LDLEN) IF (LDLEN .LT. 0 .OR. ISTAT .LT. 0) THEN ISTAT = -1 RETURN ENDIF #if defined(SERIAL_SRC) !--- Loop over all vectors DO K = 1,NV !--- Rotate by copying into a vector, copying this to another vector ! and copying back (!) Can obviously be simplyfied, but how often ! is this operation done in serial mode? DO J = 1, LSL GARR(J) = FIELD(LSO+J-1,K) ENDDO IF (SHFT .GT. 0) THEN DO I = 1, LSL-SHFT HARR(I+SHFT) = GARR(I) ENDDO IF (WRAP) THEN DO I = 1, SHFT HARR(I) = GARR(LSL-SHFT+I) ENDDO ELSE DO I = 1, SHFT HARR(I) = GARR(I) ENDDO ENDIF ELSE DO I = 1-SHFT, LSL HARR(I+SHFT) = GARR(I) ENDDO IF (WRAP) THEN DO I = 0,-1-SHFT HARR(LSL-I) = GARR(-I-SHFT) ENDDO ELSE DO I = 0,-1-SHFT HARR(LSL-I) = GARR(LSL-I) ENDDO ENDIF ENDIF DO J = 1, LSL FIELD(LSO+J-1,K) = HARR(J) ENDDO ENDDO #else !--- Send the shift length and offset of all processors to the first ! processor in the group. LST(1) = LSL LST(2) = LSO IF (GRANK .NE. 0) THEN #if defined(SHM_SRC) CALL SHMEM_PUT(GLST(1,GRANK), LST, 2, GRANK0) CALL GCG__SHMSYNC(GRANK, ILOC, IGID) #endif #if defined(MPI_SRC) #if defined(BUFFERED_MPI) CALL MPI_BSEND(LST, GC__ISIZE*2, MPI_BYTE, GRANK0, GCGID__ROT0, $ IGID, ISTAT) #else CALL MPI_SEND(LST, GC__ISIZE*2, MPI_BYTE, GRANK0, GCGID__ROT0, $ IGID, ISTAT) #endif #endif ELSE #if defined(SHM_SRC) CALL GCG__SHMSYNC(GRANK, ILOC, IGID) #else DO I = 1, GSIZE-1 #if defined (MPI_SRC) CALL MPI_RECV(GLST(1,I), GC__ISIZE*2, MPI_BYTE, I, $ GCGID__ROT0, IGID, STATUS, ISTAT) #endif ENDDO #endif !--- Exit if total length of the vectors is greater than MAX_ROTATE GLST(1,0) = LSL GLST(2,0) = LSO GJ = 0 DO I = 0,GSIZE-1 GJ = GJ + GLST(1,I) ENDDO IF (GJ .GT. MAX_ROTATE) $ CALL GCG__ERRLIM(1, 'RVECSHIFT', 'ROTATE', MAX_ROTATE, GJ) ENDIF !--- Loop over all vectors DO K = 1,NV #if defined(SHM_SRC) IF (NAM_SAFE .EQ. GC__NAM_UNA) THEN CALL GCG__SHMSYNC(GRANK, ILOC, IGID) CALL GC__GET_SWAP_LOCATION(NAM_FLAG, ME, GRANK0, ME, PTR, 0) PTR = LOC(FIELD(1,K)) !DIR$ SUPPRESS NAM_FLAG INFO = SHMEM_SWAP(NAM_FLAG(ME), PTR, GRANK0) IF (INFO .NE. SHMEM_SYNC_VALUE) $ CALL GC__LOST_LOCK(GRANK0, ME, INFO) ENDIF #endif !--- Send to first processor in the group, which do the rotate ! and distribute back again #if defined(SHM_SRC) CALL GCG__SHMSYNC(GRANK, ILOC, IGID) IF (GRANK .EQ. 0) THEN #else IF (GRANK .NE. 0) THEN #if defined (MPI_SRC) #if defined (BUFFERED_MPI) CALL MPI_BSEND(FIELD(LSO,K), GC__RSIZE*LSL, MPI_BYTE, GRANK0, $ GCGID__ROT1, IGID, ISTAT) #else CALL MPI_SEND(FIELD(LSO,K), GC__RSIZE*LSL, MPI_BYTE, GRANK0, $ GCGID__ROT1, IGID, ISTAT) #endif CALL MPI_RECV(FIELD(LSO,K), GC__RSIZE*LSL, MPI_BYTE, GRANK0, $ GCGID__ROT2, IGID, STATUS, ISTAT) #endif ELSE DO J = 1, LSL GARR(J) = FIELD(LSO+J-1,K) ENDDO GJ = LSL DO I = 1, GSIZE-1 #endif #if defined(SHM_SRC) GJ = 0 DO I = 0, GSIZE-1 IF (GID .EQ. GCG__ALLGROUP) THEN IF (NAM_SAFE .EQ. GC__NAM_ALL) THEN CALL SHMEM_GET(GARR(GJ+1), FIELD(GLST(2,I),K), $ GLST(1,I), I) ELSE PTR = NAM_FLAG(I) CALL SHMEM_GET(GARR(GJ+1), LARR(GLST(2,I)), $ GLST(1,I), I) ENDIF ELSE IF (NAM_SAFE .EQ. GC__NAM_ALL) THEN CALL SHMEM_GET(GARR(GJ+1), FIELD(GLST(2,I),K), $ GLST(1,I), GR_MEM(ILOC+I)) ELSE PTR = NAM_FLAG(GR_MEM(ILOC+I)) CALL SHMEM_GET(GARR(GJ+1), LARR(GLST(2,I)), $ GLST(1,I), GR_MEM(ILOC+I)) ENDIF ENDIF #endif #if defined(MPI_SRC) CALL MPI_RECV(GARR(GJ+1), GC__RSIZE*GLST(1,I), MPI_BYTE, $ I, GCGID__ROT1, IGID, STATUS, ISTAT) #endif GJ = GJ + GLST(1,I) ENDDO IF (SHFT .GT. 0) THEN IF (WRAP) THEN DO I = 1, SHFT HARR(I) = GARR(GJ-SHFT+I) ENDDO ELSE DO I = 1, SHFT HARR(I) = GARR(I) ENDDO ENDIF DO I = 1, GJ-SHFT HARR(I+SHFT) = GARR(I) ENDDO ELSE DO I = 1-SHFT, GJ HARR(I+SHFT) = GARR(I) ENDDO IF (WRAP) THEN DO I = 1,-SHFT HARR(GJ+SHFT+I) = GARR(I) ENDDO ELSE DO I = 1,-SHFT HARR(GJ+SHFT+I) = GARR(GJ+SHFT+I) ENDDO ENDIF ENDIF #if !defined(SHM_SRC) DO J = 1, LSL FIELD(LSO+J-1,K) = HARR(J) ENDDO GJ = LSL #else GJ = 0 #endif #if defined(SHM_SRC) IF (GID .EQ. GCG__ALLGROUP) THEN DO I = 0, GSIZE-1 IF (NAM_SAFE .EQ. GC__NAM_ALL) THEN CALL SHMEM_PUT(FIELD(GLST(2,I),K), HARR(GJ+1), $ GLST(1,I), I) ELSE PTR = NAM_FLAG(I) CALL SHMEM_PUT(LARR(GLST(2,I)), HARR(GJ+1), $ GLST(1,I), I) ENDIF GJ = GJ + GLST(1,I) ENDDO ELSE DO I = 0, GSIZE-1 IF (NAM_SAFE .EQ. GC__NAM_ALL) THEN CALL SHMEM_PUT(FIELD(GLST(2,I),K), HARR(GJ+1), $ GLST(1,I), GR_MEM(ILOC+I)) ELSE PTR = NAM_FLAG(GR_MEM(ILOC+I)) CALL SHMEM_PUT(LARR(GLST(2,I)), HARR(GJ+1), $ GLST(1,I), GR_MEM(ILOC+I)) ENDIF GJ = GJ + GLST(1,I) ENDDO ENDIF #endif #if defined(MPI_SRC) DO I = 1, GSIZE-1 #if defined(BUFFERED_MPI) CALL MPI_BSEND(HARR(GJ+1), GC__RSIZE*GLST(1,I), MPI_BYTE, $ I, GCGID__ROT2, IGID, ISTAT) #else CALL MPI_SEND(HARR(GJ+1), GC__RSIZE*GLST(1,I), MPI_BYTE, $ I, GCGID__ROT2, IGID, ISTAT) #endif GJ = GJ + GLST(1,I) ENDDO #endif ENDIF #if defined(SHM_SRC) CALL GCG__SHMSYNC(GRANK, ILOC, IGID) IF (NAM_SAFE .EQ. GC__NAM_UNA) THEN NPROC = GC_NPROC() DO I = 0, NPROC-1 NAM_FLAG(I) = 0 ENDDO ENDIF #endif ENDDO #endif RETURN END