001    /**
002     *
003     * Copyright 2004 The Apache Software Foundation
004     *
005     *  Licensed under the Apache License, Version 2.0 (the "License");
006     *  you may not use this file except in compliance with the License.
007     *  You may obtain a copy of the License at
008     *
009     *     http://www.apache.org/licenses/LICENSE-2.0
010     *
011     *  Unless required by applicable law or agreed to in writing, software
012     *  distributed under the License is distributed on an "AS IS" BASIS,
013     *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     *  See the License for the specific language governing permissions and
015     *  limitations under the License.
016     */
017    
018    package org.apache.geronimo.timer.jdbc;
019    
020    import java.sql.Connection;
021    import java.sql.PreparedStatement;
022    import java.sql.ResultSet;
023    import java.sql.SQLException;
024    import java.sql.Types;
025    import java.util.ArrayList;
026    import java.util.Collection;
027    import java.util.Date;
028    import javax.sql.DataSource;
029    
030    import com.thoughtworks.xstream.XStream;
031    import org.apache.geronimo.timer.PersistenceException;
032    import org.apache.geronimo.timer.Playback;
033    import org.apache.geronimo.timer.WorkInfo;
034    import org.apache.geronimo.timer.WorkerPersistence;
035    
036    /**
037     * TODO use an insert returning or stored procedure to insert.
038     *
039     * @version $Rev: 355877 $ $Date: 2005-12-10 18:48:27 -0800 (Sat, 10 Dec 2005) $
040     */
041    public class JDBCWorkerPersistence implements WorkerPersistence {
042    
043        private static final String createSequenceSQL = "create sequence timertasks_seq";
044        private static final String createTableSQLWithSequence = "create table timertasks (id long primary key, serverid varchar(256) not null, timerkey varchar(256) not null, userid varchar(4096), userinfo varchar(4096), firsttime long not null, period long, atfixedrate boolean not null)";
045        private static final String createTableSQLWithIdentity = "create table timertasks (id INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), serverid varchar(256) not null, timerkey varchar(256) not null, userid varchar(4096), userinfo varchar(4096), firsttime NUMERIC(18,0) not null, period NUMERIC(18, 0), atfixedrate CHAR(1))";
046        private static final String sequenceSQL = "select timertasks_seq.nextval";
047        private static final String identitySQL = "values IDENTITY_VAL_LOCAL()";
048        private static final String insertSQLWithSequence = "insert into timertasks (id, serverid, timerkey, userid, userinfo, firsttime, period, atfixedrate) values (?, ?, ?, ?, ?, ?, ?, ?)";
049        private static final String insertSQLWithIdentity = "insert into timertasks (serverid, timerkey, userid, userinfo, firsttime, period, atfixedrate) values (?, ?, ?, ?, ?, ?, ?)";
050        private static final String deleteSQL = "delete from timertasks where id=?";
051        private static final String selectSQL = "select id, userid, userinfo, firsttime, period, atfixedrate from timertasks where serverid = ? and timerkey=?";
052        private static final String fixedRateUpdateSQL = "update timertasks set firsttime = firsttime + period where id = ?";
053        private static final String intervalUpdateSQL = "update timertasks set firsttime = ? where id = ?";
054        private static final String selectByKeySQL = "select id from timertasks where serverid = ? and timerkey = ? and (userid = ? or ? is null)";
055    
056        private final String serverUniqueId;
057        private final DataSource dataSource;
058        private boolean useSequence = false;
059    
060        protected JDBCWorkerPersistence(String serverUniqueId, DataSource datasource, boolean useSequence) throws SQLException {
061            this.serverUniqueId = serverUniqueId;
062            this.dataSource = datasource;
063            this.useSequence = useSequence;
064            if (this.useSequence) {
065                execSQL(createSequenceSQL);
066                execSQL(createTableSQLWithSequence);
067            } else {
068                execSQL(createTableSQLWithIdentity);
069            }
070        }
071    
072    
073        public void save(WorkInfo workInfo) throws PersistenceException {
074            try {
075                Connection c = dataSource.getConnection();
076                try {
077                    if (useSequence) {
078                        long id;
079                        PreparedStatement seqStatement = c.prepareStatement(sequenceSQL);
080                        try {
081                            ResultSet seqRS = seqStatement.executeQuery();
082                            try {
083                                seqRS.next();
084                                id = seqRS.getLong(1);
085                            } finally {
086                                seqRS.close();
087                            }
088                        } finally {
089                            seqStatement.close();
090                        }
091                        workInfo.setId(id);
092                        PreparedStatement insertStatement = c.prepareStatement(insertSQLWithSequence);
093                        try {
094                            String serializedUserId = serialize(workInfo.getUserId());
095                            String serializedUserKey = serialize(workInfo.getUserInfo());
096                            insertStatement.setLong(1, id);
097                            insertStatement.setString(2, serverUniqueId);
098                            insertStatement.setString(3, workInfo.getKey());
099                            insertStatement.setString(4, serializedUserId);
100                            insertStatement.setString(5, serializedUserKey);
101                            insertStatement.setLong(6, workInfo.getTime().getTime());
102                            if (workInfo.getPeriod() == null) {
103                                insertStatement.setNull(7, Types.NUMERIC);
104                            } else {
105                                insertStatement.setLong(7, workInfo.getPeriod().longValue());
106                            }
107                            insertStatement.setBoolean(8, workInfo.getAtFixedRate());
108                            int result = insertStatement.executeUpdate();
109                            if (result != 1) {
110                                throw new PersistenceException("Could not insert!");
111                            }
112                        } finally {
113                            insertStatement.close();
114                        }
115                    } else {
116                        PreparedStatement insertStatement = c.prepareStatement(insertSQLWithIdentity);
117                        try {
118                            String serializedUserId = serialize(workInfo.getUserId());
119                            String serializedUserKey = serialize(workInfo.getUserInfo());
120                            insertStatement.setString(1, serverUniqueId);
121                            insertStatement.setString(2, workInfo.getKey());
122                            insertStatement.setString(3, serializedUserId);
123                            insertStatement.setString(4, serializedUserKey);
124                            insertStatement.setLong(5, workInfo.getTime().getTime());
125                            if (workInfo.getPeriod() == null) {
126                                insertStatement.setNull(6, Types.NUMERIC);
127                            } else {
128                                insertStatement.setLong(6, workInfo.getPeriod().longValue());
129                            }
130                            insertStatement.setBoolean(7, workInfo.getAtFixedRate());
131                            int result = insertStatement.executeUpdate();
132                            if (result != 1) {
133                                throw new PersistenceException("Could not insert!");
134                            }
135                        } finally {
136                            insertStatement.close();
137                        }
138                        long id;
139                        PreparedStatement identityStatement = c.prepareStatement(identitySQL);
140                        try {
141                            ResultSet seqRS = identityStatement.executeQuery();
142                            try {
143                                seqRS.next();
144                                id = seqRS.getLong(1);
145                            } finally {
146                                seqRS.close();
147                            }
148                        } finally {
149                            identityStatement.close();
150                        }
151                        workInfo.setId(id);
152                    }
153                } finally {
154                    c.close();
155                }
156            } catch (SQLException e) {
157                throw new PersistenceException(e);
158            }
159        }
160    
161        public void cancel(long id) throws PersistenceException {
162            try {
163                Connection c = dataSource.getConnection();
164                try {
165                    PreparedStatement deleteStatement = c.prepareStatement(deleteSQL);
166                    try {
167                        deleteStatement.setLong(1, id);
168                        deleteStatement.execute();
169                    } finally {
170                        deleteStatement.close();
171                    }
172                } finally {
173                    c.close();
174                }
175            } catch (SQLException e) {
176                throw new PersistenceException(e);
177            }
178    
179        }
180    
181        public void playback(String key, Playback playback) throws PersistenceException {
182            try {
183                Connection c = dataSource.getConnection();
184                try {
185                    PreparedStatement selectStatement = c.prepareStatement(selectSQL);
186                    selectStatement.setString(1, serverUniqueId);
187                    selectStatement.setString(2, key);
188                    try {
189                        ResultSet taskRS = selectStatement.executeQuery();
190                        try {
191                            while (taskRS.next()) {
192                                long id = taskRS.getLong(1);
193                                String serizalizedUserId = taskRS.getString(2);
194                                Object userId = deserialize(serizalizedUserId);
195                                String serializedUserInfo = taskRS.getString(3);
196                                Object userInfo = deserialize(serializedUserInfo);
197                                long timeMillis = taskRS.getLong(4);
198                                Date time = new Date(timeMillis);
199                                Long period = null;
200                                period = new Long(taskRS.getLong(5));
201                                if (!taskRS.wasNull()) {
202                                    period = null;
203                                }
204                                boolean atFixedRate = taskRS.getBoolean(6);
205                                //TODO make sure the reference to this is ok, meaning we can't use a handle to this WorkerPersistence.
206                                WorkInfo workInfo = new WorkInfo(key, userId, userInfo, time, period, atFixedRate);
207                                workInfo.setId(id);
208                                playback.schedule(workInfo);
209                            }
210                        } finally {
211                            taskRS.close();
212                        }
213                    } finally {
214                        selectStatement.close();
215                    }
216                } finally {
217                    c.close();
218                }
219            } catch (SQLException e) {
220                throw new PersistenceException(e);
221            }
222        }
223    
224        public void fixedRateWorkPerformed(long id) throws PersistenceException {
225            try {
226                Connection c = dataSource.getConnection();
227                try {
228                    PreparedStatement updateStatement = c.prepareStatement(fixedRateUpdateSQL);
229                    try {
230                        updateStatement.setLong(1, id);
231                        updateStatement.execute();
232                    } finally {
233                        updateStatement.close();
234                    }
235                } finally {
236                    c.close();
237                }
238            } catch (SQLException e) {
239                throw new PersistenceException(e);
240            }
241        }
242    
243        public void intervalWorkPerformed(long id, long period) throws PersistenceException {
244            long next = System.currentTimeMillis() + period;
245            try {
246                Connection c = dataSource.getConnection();
247                try {
248                    PreparedStatement updateStatement = c.prepareStatement(intervalUpdateSQL);
249                    try {
250                        updateStatement.setLong(1, next);
251                        updateStatement.setLong(2, id);
252                        updateStatement.execute();
253                    } finally {
254                        updateStatement.close();
255                    }
256                } finally {
257                    c.close();
258                }
259            } catch (SQLException e) {
260                throw new PersistenceException(e);
261            }
262        }
263    
264        public Collection getIdsByKey(String key, Object userId) throws PersistenceException {
265            Collection ids = new ArrayList();
266            try {
267                Connection c = dataSource.getConnection();
268                try {
269                    PreparedStatement selectStatement = c.prepareStatement(selectByKeySQL);
270                    selectStatement.setString(1, serverUniqueId);
271                    selectStatement.setString(2, key);
272                    if (userId == null) {
273                        selectStatement.setNull(3, Types.VARCHAR);
274                        selectStatement.setNull(4, Types.VARCHAR);
275                    } else {
276                        String userIdString = serialize(userId);
277                        selectStatement.setString(3, userIdString);
278                        selectStatement.setString(4, userIdString);
279                    }
280                    try {
281                        ResultSet taskRS = selectStatement.executeQuery();
282                        try {
283                            while (taskRS.next()) {
284                                long id = taskRS.getLong(1);
285                                ids.add(new Long(id));
286                            }
287                        } finally {
288                            taskRS.close();
289                        }
290                    } finally {
291                        selectStatement.close();
292                    }
293                } finally {
294                    c.close();
295                }
296            } catch (SQLException e) {
297                throw new PersistenceException(e);
298            }
299            return ids;
300        }
301    
302        private String serialize(Object task) {
303            XStream xStream = new XStream();
304            return xStream.toXML(task);
305        }
306    
307        private Object deserialize(String serializedRunnable) {
308            XStream xStream = new XStream();
309            return xStream.fromXML(serializedRunnable);
310        }
311    
312        private void execSQL(String sql) throws SQLException {
313            Connection c = dataSource.getConnection();
314            try {
315                PreparedStatement updateStatement = c.prepareStatement(sql);
316                try {
317                    updateStatement.execute();
318                } catch (SQLException e) {
319                    //ignore... table already exists.
320                } finally {
321                    updateStatement.close();
322                }
323            } finally {
324                c.close();
325            }
326        }
327    
328    }