001    /**
002     *  Licensed to the Apache Software Foundation (ASF) under one or more
003     *  contributor license agreements.  See the NOTICE file distributed with
004     *  this work for additional information regarding copyright ownership.
005     *  The ASF licenses this file to You under the Apache License, Version 2.0
006     *  (the "License"); you may not use this file except in compliance with
007     *  the License.  You may obtain a copy of the License at
008     *
009     *     http://www.apache.org/licenses/LICENSE-2.0
010     *
011     *  Unless required by applicable law or agreed to in writing, software
012     *  distributed under the License is distributed on an "AS IS" BASIS,
013     *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     *  See the License for the specific language governing permissions and
015     *  limitations under the License.
016     */
017    
018    package org.apache.geronimo.timer.jdbc;
019    
020    import com.thoughtworks.xstream.XStream;
021    import org.apache.geronimo.timer.PersistenceException;
022    import org.apache.geronimo.timer.Playback;
023    import org.apache.geronimo.timer.WorkInfo;
024    import org.apache.geronimo.timer.WorkerPersistence;
025    
026    import javax.sql.DataSource;
027    import java.sql.Connection;
028    import java.sql.PreparedStatement;
029    import java.sql.ResultSet;
030    import java.sql.SQLException;
031    import java.sql.Types;
032    import java.util.ArrayList;
033    import java.util.Collection;
034    import java.util.Date;
035    
036    /**
037     * TODO use an insert returning or stored procedure to insert.
038     *
039     * @version $Rev: 476049 $ $Date: 2006-11-16 23:35:17 -0500 (Thu, 16 Nov 2006) $
040     */
041    public class JDBCWorkerPersistence implements WorkerPersistence {
042    
043        private static final String createSequenceSQL = "create sequence timertasks_seq";
044        private static final String createTableSQLWithSequence = "create table timertasks (id long primary key, serverid varchar(256) not null, timerkey varchar(256) not null, userid varchar(4096), userinfo varchar(4096), firsttime long not null, period long, atfixedrate boolean not null)";
045        private static final String createTableSQLWithIdentity = "create table timertasks (id INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), serverid varchar(256) not null, timerkey varchar(256) not null, userid varchar(4096), userinfo varchar(4096), firsttime NUMERIC(18,0) not null, period NUMERIC(18, 0), atfixedrate CHAR(1))";
046        private static final String sequenceSQL = "select timertasks_seq.nextval";
047        private static final String identitySQL = "values IDENTITY_VAL_LOCAL()";
048        private static final String insertSQLWithSequence = "insert into timertasks (id, serverid, timerkey, userid, userinfo, firsttime, period, atfixedrate) values (?, ?, ?, ?, ?, ?, ?, ?)";
049        private static final String insertSQLWithIdentity = "insert into timertasks (serverid, timerkey, userid, userinfo, firsttime, period, atfixedrate) values (?, ?, ?, ?, ?, ?, ?)";
050        private static final String deleteSQL = "delete from timertasks where id=?";
051        private static final String selectSQL = "select id, userid, userinfo, firsttime, period, atfixedrate from timertasks where serverid = ? and timerkey=?";
052        private static final String fixedRateUpdateSQL = "update timertasks set firsttime = firsttime + period where id = ?";
053        private static final String intervalUpdateSQL = "update timertasks set firsttime = ? where id = ?";
054        private static final String selectByKeySQL = "select id from timertasks where serverid = ? and timerkey = ? and (userid = ? or ? is null)";
055    
056        private final String serverUniqueId;
057        private final DataSource dataSource;
058        private boolean useSequence = false;
059    
060        protected JDBCWorkerPersistence(String serverUniqueId, DataSource datasource, boolean useSequence) throws SQLException {
061            this.serverUniqueId = serverUniqueId;
062            this.dataSource = datasource;
063            this.useSequence = useSequence;
064            if (this.useSequence) {
065                execSQL(createSequenceSQL);
066                execSQL(createTableSQLWithSequence);
067            } else {
068                execSQL(createTableSQLWithIdentity);
069            }
070        }
071    
072    
073        public void save(WorkInfo workInfo) throws PersistenceException {
074            boolean threwException = false;
075            Connection c = getConnection();
076            try {
077                if (useSequence) {
078                    long id;
079                    PreparedStatement seqStatement = c.prepareStatement(sequenceSQL);
080                    try {
081                        ResultSet seqRS = seqStatement.executeQuery();
082                        try {
083                            seqRS.next();
084                            id = seqRS.getLong(1);
085                        } finally {
086                            seqRS.close();
087                        }
088                    } finally {
089                        seqStatement.close();
090                    }
091                    workInfo.setId(id);
092                    PreparedStatement insertStatement = c.prepareStatement(insertSQLWithSequence);
093                    try {
094                        String serializedUserId = serialize(workInfo.getUserId());
095                        String serializedUserKey = serialize(workInfo.getUserInfo());
096                        insertStatement.setLong(1, id);
097                        insertStatement.setString(2, serverUniqueId);
098                        insertStatement.setString(3, workInfo.getKey());
099                        insertStatement.setString(4, serializedUserId);
100                        insertStatement.setString(5, serializedUserKey);
101                        insertStatement.setLong(6, workInfo.getTime().getTime());
102                        if (workInfo.getPeriod() == null) {
103                            insertStatement.setNull(7, Types.NUMERIC);
104                        } else {
105                            insertStatement.setLong(7, workInfo.getPeriod().longValue());
106                        }
107                        insertStatement.setBoolean(8, workInfo.getAtFixedRate());
108                        int result = insertStatement.executeUpdate();
109                        if (result != 1) {
110                            throw new PersistenceException("Could not insert!");
111                        }
112                    } finally {
113                        insertStatement.close();
114                    }
115                } else {
116                    PreparedStatement insertStatement = c.prepareStatement(insertSQLWithIdentity);
117                    try {
118                        String serializedUserId = serialize(workInfo.getUserId());
119                        String serializedUserKey = serialize(workInfo.getUserInfo());
120                        insertStatement.setString(1, serverUniqueId);
121                        insertStatement.setString(2, workInfo.getKey());
122                        insertStatement.setString(3, serializedUserId);
123                        insertStatement.setString(4, serializedUserKey);
124                        insertStatement.setLong(5, workInfo.getTime().getTime());
125                        if (workInfo.getPeriod() == null) {
126                            insertStatement.setNull(6, Types.NUMERIC);
127                        } else {
128                            insertStatement.setLong(6, workInfo.getPeriod().longValue());
129                        }
130                        insertStatement.setBoolean(7, workInfo.getAtFixedRate());
131                        int result = insertStatement.executeUpdate();
132                        if (result != 1) {
133                            throw new PersistenceException("Could not insert!");
134                        }
135                    } finally {
136                        insertStatement.close();
137                    }
138                    long id;
139                    PreparedStatement identityStatement = c.prepareStatement(identitySQL);
140                    try {
141                        ResultSet seqRS = identityStatement.executeQuery();
142                        try {
143                            seqRS.next();
144                            id = seqRS.getLong(1);
145                        } finally {
146                            seqRS.close();
147                        }
148                    } finally {
149                        identityStatement.close();
150                    }
151                    workInfo.setId(id);
152                }
153            } catch (SQLException e) {
154                threwException = true;
155                throw new PersistenceException(e);
156            } finally {
157                close(c, !threwException);
158            }
159        }
160    
161        public void cancel(long id) throws PersistenceException {
162            boolean threwException = false;
163    
164            Connection c = getConnection();
165            try {
166                PreparedStatement deleteStatement = c.prepareStatement(deleteSQL);
167                try {
168                    deleteStatement.setLong(1, id);
169                    deleteStatement.execute();
170                } finally {
171                    deleteStatement.close();
172                }
173            } catch (SQLException e) {
174                threwException = true;
175                throw new PersistenceException(e);
176            } finally {
177                close(c, !threwException);
178            }
179        }
180    
181        public void playback(String key, Playback playback) throws PersistenceException {
182            boolean threwException = false;
183            Connection c = getConnection();
184            try {
185                PreparedStatement selectStatement = c.prepareStatement(selectSQL);
186                selectStatement.setString(1, serverUniqueId);
187                selectStatement.setString(2, key);
188                try {
189                    ResultSet taskRS = selectStatement.executeQuery();
190                    try {
191                        while (taskRS.next()) {
192                            long id = taskRS.getLong(1);
193                            String serizalizedUserId = taskRS.getString(2);
194                            Object userId = deserialize(serizalizedUserId);
195                            String serializedUserInfo = taskRS.getString(3);
196                            Object userInfo = deserialize(serializedUserInfo);
197                            long timeMillis = taskRS.getLong(4);
198                            Date time = new Date(timeMillis);
199                            Long period = null;
200                            period = new Long(taskRS.getLong(5));
201                            if (!taskRS.wasNull()) {
202                                period = null;
203                            }
204                            boolean atFixedRate = taskRS.getBoolean(6);
205                            //TODO make sure the reference to this is ok, meaning we can't use a handle to this WorkerPersistence.
206                            WorkInfo workInfo = new WorkInfo(key, userId, userInfo, time, period, atFixedRate);
207                            workInfo.setId(id);
208                            playback.schedule(workInfo);
209                        }
210                    } finally {
211                        taskRS.close();
212                    }
213                } finally {
214                    selectStatement.close();
215                }
216            } catch (SQLException e) {
217                threwException = true;
218                throw new PersistenceException(e);
219            } finally {
220                close(c, !threwException);
221            }
222        }
223    
224        public void fixedRateWorkPerformed(long id) throws PersistenceException {
225            boolean threwException = false;
226            Connection c = getConnection();
227            try {
228                PreparedStatement updateStatement = c.prepareStatement(fixedRateUpdateSQL);
229                try {
230                    updateStatement.setLong(1, id);
231                    updateStatement.execute();
232                } finally {
233                    updateStatement.close();
234                }
235            } catch (SQLException e) {
236                threwException = true;
237                throw new PersistenceException(e);
238            } finally {
239                close(c, !threwException);
240            }
241        }
242    
243        public void intervalWorkPerformed(long id, long period) throws PersistenceException {
244            boolean threwException = false;
245            long next = System.currentTimeMillis() + period;
246            Connection c = getConnection();
247            try {
248                PreparedStatement updateStatement = c.prepareStatement(intervalUpdateSQL);
249                try {
250                    updateStatement.setLong(1, next);
251                    updateStatement.setLong(2, id);
252                    updateStatement.execute();
253                } finally {
254                    updateStatement.close();
255                }
256            } catch (SQLException e) {
257                threwException = true;
258                throw new PersistenceException(e);
259            } finally {
260                close(c, !threwException);
261            }
262        }
263    
264        public Collection getIdsByKey(String key, Object userId) throws PersistenceException {
265            boolean threwException = false;
266    
267            Collection ids = new ArrayList();
268            Connection c = getConnection();
269            try {
270                PreparedStatement selectStatement = c.prepareStatement(selectByKeySQL);
271                selectStatement.setString(1, serverUniqueId);
272                selectStatement.setString(2, key);
273                if (userId == null) {
274                    selectStatement.setNull(3, Types.VARCHAR);
275                    selectStatement.setNull(4, Types.VARCHAR);
276                } else {
277                    String userIdString = serialize(userId);
278                    selectStatement.setString(3, userIdString);
279                    selectStatement.setString(4, userIdString);
280                }
281                try {
282                    ResultSet taskRS = selectStatement.executeQuery();
283                    try {
284                        while (taskRS.next()) {
285                            long id = taskRS.getLong(1);
286                            ids.add(new Long(id));
287                        }
288                    } finally {
289                        taskRS.close();
290                    }
291                } finally {
292                    selectStatement.close();
293                }
294            } catch (SQLException e) {
295                threwException = true;
296                throw new PersistenceException(e);
297            } finally {
298                close(c, !threwException);
299            }
300    
301            return ids;
302        }
303    
304        private String serialize(Object task) {
305            XStream xStream = new XStream();
306            return xStream.toXML(task);
307        }
308    
309        private Object deserialize(String serializedRunnable) {
310            XStream xStream = new XStream();
311            return xStream.fromXML(serializedRunnable);
312        }
313    
314        private void execSQL(String sql) throws SQLException {
315            Connection c = dataSource.getConnection();
316            try {
317                PreparedStatement updateStatement = c.prepareStatement(sql);
318                try {
319                    updateStatement.execute();
320                } catch (SQLException e) {
321                    //ignore... table already exists.
322                } finally {
323                    updateStatement.close();
324                }
325            } finally {
326                c.close();
327            }
328        }
329    
330        private Connection getConnection() throws PersistenceException {
331            try {
332                return dataSource.getConnection();
333            } catch (Exception e) {
334                throw new PersistenceException(e);
335            }
336        }
337    
338        private void close(Connection c, boolean reportSqlException) throws PersistenceException {
339            try {
340                c.close();
341            } catch (Exception e) {
342                if (!reportSqlException) {
343                    throw new PersistenceException(e);
344                }
345            }
346        }
347    }