001    /**
002     *  Licensed to the Apache Software Foundation (ASF) under one or more
003     *  contributor license agreements.  See the NOTICE file distributed with
004     *  this work for additional information regarding copyright ownership.
005     *  The ASF licenses this file to You under the Apache License, Version 2.0
006     *  (the "License"); you may not use this file except in compliance with
007     *  the License.  You may obtain a copy of the License at
008     *
009     *     http://www.apache.org/licenses/LICENSE-2.0
010     *
011     *  Unless required by applicable law or agreed to in writing, software
012     *  distributed under the License is distributed on an "AS IS" BASIS,
013     *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     *  See the License for the specific language governing permissions and
015     *  limitations under the License.
016     */
017    
018    package org.apache.geronimo.transaction.log;
019    
020    import java.io.IOException;
021    import java.io.File;
022    import java.util.Collection;
023    import java.util.HashMap;
024    import java.util.Iterator;
025    import java.util.List;
026    import java.util.Map;
027    import javax.transaction.xa.Xid;
028    
029    import org.apache.geronimo.transaction.manager.LogException;
030    import org.apache.geronimo.transaction.manager.Recovery;
031    import org.apache.geronimo.transaction.manager.TransactionBranchInfo;
032    import org.apache.geronimo.transaction.manager.TransactionBranchInfoImpl;
033    import org.apache.geronimo.transaction.manager.TransactionLog;
034    import org.apache.geronimo.transaction.manager.XidFactory;
035    import org.objectweb.howl.log.Configuration;
036    import org.objectweb.howl.log.LogClosedException;
037    import org.objectweb.howl.log.LogConfigurationException;
038    import org.objectweb.howl.log.LogFileOverflowException;
039    import org.objectweb.howl.log.LogRecord;
040    import org.objectweb.howl.log.LogRecordSizeException;
041    import org.objectweb.howl.log.LogRecordType;
042    import org.objectweb.howl.log.ReplayListener;
043    import org.objectweb.howl.log.xa.XACommittingTx;
044    import org.objectweb.howl.log.xa.XALogRecord;
045    import org.objectweb.howl.log.xa.XALogger;
046    import org.slf4j.Logger;
047    import org.slf4j.LoggerFactory;
048    
049    /**
050     * @version $Rev: 723385 $ $Date: 2008-12-04 12:55:02 -0500 (Thu, 04 Dec 2008) $
051     */
052    public class HOWLLog implements TransactionLog {
053    //    static final byte PREPARE = 1;
054        //these are used as debugging aids only
055        private static final byte COMMIT = 2;
056        private static final byte ROLLBACK = 3;
057    
058        static final String[] TYPE_NAMES = {null, "PREPARE", "COMMIT", "ROLLBACK"};
059    
060        private static final Logger log = LoggerFactory.getLogger(HOWLLog.class);
061    
062        private File serverBaseDir;
063        private String logFileDir;
064    
065        private final XidFactory xidFactory;
066    
067        private final XALogger logger;
068        private final Configuration configuration = new Configuration();
069        private boolean started = false;
070        private HashMap recovered;
071    
072        public HOWLLog(String bufferClassName,
073                       int bufferSize,
074                       boolean checksumEnabled,
075                       boolean adler32Checksum,
076                       int flushSleepTimeMilliseconds,
077                       String logFileDir,
078                       String logFileExt,
079                       String logFileName,
080                       int maxBlocksPerFile,
081                       int maxBuffers,
082                       int maxLogFiles,
083                       int minBuffers,
084                       int threadsWaitingForceThreshold,
085                       XidFactory xidFactory,
086                       File serverBaseDir) throws IOException, LogConfigurationException {
087            this.serverBaseDir = serverBaseDir;
088            setBufferClassName(bufferClassName);
089            setBufferSizeKBytes(bufferSize);
090            setChecksumEnabled(checksumEnabled);
091            setAdler32Checksum(adler32Checksum);
092            setFlushSleepTimeMilliseconds(flushSleepTimeMilliseconds);
093            //setLogFileDir(logFileDir);
094            this.logFileDir = logFileDir;
095            setLogFileExt(logFileExt);
096            setLogFileName(logFileName);
097            setMaxBlocksPerFile(maxBlocksPerFile);
098            setMaxBuffers(maxBuffers);
099            setMaxLogFiles(maxLogFiles);
100            setMinBuffers(minBuffers);
101            setThreadsWaitingForceThreshold(threadsWaitingForceThreshold);
102            this.xidFactory = xidFactory;
103            this.logger = new XALogger(configuration);
104        }
105    
106        public String getLogFileDir() {
107            return logFileDir;
108        }
109    
110        public void setLogFileDir(String logDirName) {
111            File logDir = new File(logDirName);
112            if (!logDir.isAbsolute()) {
113                logDir = new File(serverBaseDir, logDirName);
114            }
115    
116            this.logFileDir = logDirName;
117            if (started) {
118                configuration.setLogFileDir(logDir.getAbsolutePath());
119            }
120        }
121    
122        public String getLogFileExt() {
123            return configuration.getLogFileExt();
124        }
125    
126        public void setLogFileExt(String logFileExt) {
127            configuration.setLogFileExt(logFileExt);
128        }
129    
130        public String getLogFileName() {
131            return configuration.getLogFileName();
132        }
133    
134        public void setLogFileName(String logFileName) {
135            configuration.setLogFileName(logFileName);
136        }
137    
138        public boolean isChecksumEnabled() {
139            return configuration.isChecksumEnabled();
140        }
141    
142        public void setChecksumEnabled(boolean checksumOption) {
143            configuration.setChecksumEnabled(checksumOption);
144        }
145    
146        public boolean isAdler32ChecksumEnabled() {
147            return configuration.isAdler32ChecksumEnabled();
148        }
149    
150        public void setAdler32Checksum(boolean checksumOption) {
151            configuration.setAdler32Checksum(checksumOption);
152        }
153    
154        public int getBufferSizeKBytes() {
155            return configuration.getBufferSize();
156        }
157    
158        public void setBufferSizeKBytes(int bufferSize) throws LogConfigurationException {
159            configuration.setBufferSize(bufferSize);
160        }
161    
162        public String getBufferClassName() {
163            return configuration.getBufferClassName();
164        }
165    
166        public void setBufferClassName(String bufferClassName) {
167            configuration.setBufferClassName(bufferClassName);
168        }
169    
170        public int getMaxBuffers() {
171            return configuration.getMaxBuffers();
172        }
173    
174        public void setMaxBuffers(int maxBuffers) throws LogConfigurationException {
175            configuration.setMaxBuffers(maxBuffers);
176        }
177    
178        public int getMinBuffers() {
179            return configuration.getMinBuffers();
180        }
181    
182        public void setMinBuffers(int minBuffers) throws LogConfigurationException {
183            configuration.setMinBuffers(minBuffers);
184        }
185    
186        public int getFlushSleepTimeMilliseconds() {
187            return configuration.getFlushSleepTime();
188        }
189    
190        public void setFlushSleepTimeMilliseconds(int flushSleepTime) {
191            configuration.setFlushSleepTime(flushSleepTime);
192        }
193    
194        public int getThreadsWaitingForceThreshold() {
195            return configuration.getThreadsWaitingForceThreshold();
196        }
197    
198        public void setThreadsWaitingForceThreshold(int threadsWaitingForceThreshold) {
199            configuration.setThreadsWaitingForceThreshold(threadsWaitingForceThreshold == -1 ? Integer.MAX_VALUE : threadsWaitingForceThreshold);
200        }
201    
202        public int getMaxBlocksPerFile() {
203            return configuration.getMaxBlocksPerFile();
204        }
205    
206        public void setMaxBlocksPerFile(int maxBlocksPerFile) {
207            configuration.setMaxBlocksPerFile(maxBlocksPerFile == -1 ? Integer.MAX_VALUE : maxBlocksPerFile);
208        }
209    
210        public int getMaxLogFiles() {
211            return configuration.getMaxLogFiles();
212        }
213    
214        public void setMaxLogFiles(int maxLogFiles) {
215            configuration.setMaxLogFiles(maxLogFiles);
216        }
217    
218        public void doStart() throws Exception {
219            started = true;
220            setLogFileDir(logFileDir);
221            log.debug("Initiating transaction manager recovery");
222            recovered = new HashMap();
223    
224            logger.open(null);
225    
226            ReplayListener replayListener = new GeronimoReplayListener(xidFactory, recovered);
227            logger.replayActiveTx(replayListener);
228    
229            log.debug("In doubt transactions recovered from log");
230        }
231    
232        public void doStop() throws Exception {
233            started = false;
234            logger.close();
235            recovered = null;
236        }
237    
238        public void doFail() {
239        }
240    
241        public void begin(Xid xid) throws LogException {
242        }
243    
244        public Object prepare(Xid xid, List branches) throws LogException {
245            int branchCount = branches.size();
246            byte[][] data = new byte[3 + 2 * branchCount][];
247            data[0] = intToBytes(xid.getFormatId());
248            data[1] = xid.getGlobalTransactionId();
249            data[2] = xid.getBranchQualifier();
250            int i = 3;
251            for (Iterator iterator = branches.iterator(); iterator.hasNext();) {
252                TransactionBranchInfo transactionBranchInfo = (TransactionBranchInfo) iterator.next();
253                data[i++] = transactionBranchInfo.getBranchXid().getBranchQualifier();
254                data[i++] = transactionBranchInfo.getResourceName().getBytes();
255            }
256            try {
257                XACommittingTx committingTx = logger.putCommit(data);
258                return committingTx;
259            } catch (LogClosedException e) {
260                throw (IllegalStateException) new IllegalStateException().initCause(e);
261            } catch (LogRecordSizeException e) {
262                throw (IllegalStateException) new IllegalStateException().initCause(e);
263            } catch (LogFileOverflowException e) {
264                throw (IllegalStateException) new IllegalStateException().initCause(e);
265            } catch (InterruptedException e) {
266                throw (IllegalStateException) new IllegalStateException().initCause(e);
267            } catch (IOException e) {
268                throw new LogException(e);
269            }
270        }
271    
272        public void commit(Xid xid, Object logMark) throws LogException {
273            //the data is theoretically unnecessary but is included to help with debugging and because HOWL currently requires it.
274            byte[][] data = new byte[4][];
275            data[0] = new byte[]{COMMIT};
276            data[1] = intToBytes(xid.getFormatId());
277            data[2] = xid.getGlobalTransactionId();
278            data[3] = xid.getBranchQualifier();
279            try {
280                logger.putDone(data, (XACommittingTx) logMark);
281    //            logger.putDone(null, (XACommittingTx) logMark);
282            } catch (LogClosedException e) {
283                throw (IllegalStateException) new IllegalStateException().initCause(e);
284            } catch (LogRecordSizeException e) {
285                throw (IllegalStateException) new IllegalStateException().initCause(e);
286            } catch (LogFileOverflowException e) {
287                throw (IllegalStateException) new IllegalStateException().initCause(e);
288            } catch (InterruptedException e) {
289                throw (IllegalStateException) new IllegalStateException().initCause(e);
290            } catch (IOException e) {
291                throw new LogException(e);
292            }
293        }
294    
295        public void rollback(Xid xid, Object logMark) throws LogException {
296            //the data is theoretically unnecessary but is included to help with debugging and because HOWL currently requires it.
297            byte[][] data = new byte[4][];
298            data[0] = new byte[]{ROLLBACK};
299            data[1] = intToBytes(xid.getFormatId());
300            data[2] = xid.getGlobalTransactionId();
301            data[3] = xid.getBranchQualifier();
302            try {
303                logger.putDone(data, (XACommittingTx) logMark);
304    //            logger.putDone(null, (XACommittingTx) logMark);
305            } catch (LogClosedException e) {
306                throw (IllegalStateException) new IllegalStateException().initCause(e);
307            } catch (LogRecordSizeException e) {
308                throw (IllegalStateException) new IllegalStateException().initCause(e);
309            } catch (LogFileOverflowException e) {
310                throw (IllegalStateException) new IllegalStateException().initCause(e);
311            } catch (InterruptedException e) {
312                throw (IllegalStateException) new IllegalStateException().initCause(e);
313            } catch (IOException e) {
314                throw new LogException(e);
315            }
316        }
317    
318        public Collection recover(XidFactory xidFactory) throws LogException {
319            log.debug("Initiating transaction manager recovery");
320            Map recovered = new HashMap();
321            ReplayListener replayListener = new GeronimoReplayListener(xidFactory, recovered);
322            logger.replayActiveTx(replayListener);
323            log.debug("In doubt transactions recovered from log");
324            return recovered.values();
325        }
326    
327        public String getXMLStats() {
328            return logger.getStats();
329        }
330    
331        public int getAverageForceTime() {
332            return 0;//logger.getAverageForceTime();
333        }
334    
335        public int getAverageBytesPerForce() {
336            return 0;//logger.getAverageBytesPerForce();
337        }
338    
339        private byte[] intToBytes(int formatId) {
340            byte[] buffer = new byte[4];
341            buffer[0] = (byte) (formatId >> 24);
342            buffer[1] = (byte) (formatId >> 16);
343            buffer[2] = (byte) (formatId >> 8);
344            buffer[3] = (byte) (formatId >> 0);
345            return buffer;
346        }
347    
348        private int bytesToInt(byte[] buffer) {
349            return ((int) buffer[0]) << 24 + ((int) buffer[1]) << 16 + ((int) buffer[2]) << 8 + ((int) buffer[3]) << 0;
350        }
351    
352        private class GeronimoReplayListener implements ReplayListener {
353    
354            private final XidFactory xidFactory;
355            private final Map recoveredTx;
356    
357            public GeronimoReplayListener(XidFactory xidFactory, Map recoveredTx) {
358                this.xidFactory = xidFactory;
359                this.recoveredTx = recoveredTx;
360            }
361    
362            public void onRecord(LogRecord plainlr) {
363                XALogRecord lr = (XALogRecord) plainlr;
364                short recordType = lr.type;
365                XACommittingTx tx = lr.getTx();
366                if (recordType == LogRecordType.XACOMMIT) {
367    
368                    byte[][] data = tx.getRecord();
369    
370                    assert data[0].length == 4;
371                    int formatId = bytesToInt(data[1]);
372                    byte[] globalId = data[1];
373                    byte[] branchId = data[2];
374                    Xid masterXid = xidFactory.recover(formatId, globalId, branchId);
375    
376                    Recovery.XidBranchesPair xidBranchesPair = new Recovery.XidBranchesPair(masterXid, tx);
377                    recoveredTx.put(masterXid, xidBranchesPair);
378                    log.debug("recovered prepare record for master xid: " + masterXid);
379                    for (int i = 3; i < data.length; i += 2) {
380                        byte[] branchBranchId = data[i];
381                        String name = new String(data[i + 1]);
382    
383                        Xid branchXid = xidFactory.recover(formatId, globalId, branchBranchId);
384                        TransactionBranchInfoImpl branchInfo = new TransactionBranchInfoImpl(branchXid, name);
385                        xidBranchesPair.addBranch(branchInfo);
386                        log.debug("recovered branch for resource manager, branchId " + name + ", " + branchXid);
387                    }
388                } else {
389                    if(recordType != LogRecordType.END_OF_LOG) { // This value crops up every time the server is started
390                        log.warn("Received unexpected log record: " + lr +" ("+recordType+")");
391                    }
392                }
393            }
394    
395            public void onError(org.objectweb.howl.log.LogException exception) {
396                log.error("Error during recovery: ", exception);
397            }
398    
399            public LogRecord getLogRecord() {
400                //TODO justify this size estimate
401                return new LogRecord(10 * 2 * Xid.MAXBQUALSIZE);
402            }
403    
404        }
405    }