001 /** 002 * 003 * Copyright 2004 The Apache Software Foundation 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018 package org.apache.geronimo.timer.jdbc; 019 020 import java.sql.Connection; 021 import java.sql.PreparedStatement; 022 import java.sql.ResultSet; 023 import java.sql.SQLException; 024 import java.sql.Types; 025 import java.util.ArrayList; 026 import java.util.Collection; 027 import java.util.Date; 028 import javax.sql.DataSource; 029 030 import com.thoughtworks.xstream.XStream; 031 import org.apache.geronimo.timer.PersistenceException; 032 import org.apache.geronimo.timer.Playback; 033 import org.apache.geronimo.timer.WorkInfo; 034 import org.apache.geronimo.timer.WorkerPersistence; 035 036 /** 037 * TODO use an insert returning or stored procedure to insert. 038 * 039 * @version $Rev: 355877 $ $Date: 2005-12-10 18:48:27 -0800 (Sat, 10 Dec 2005) $ 040 */ 041 public class JDBCWorkerPersistence implements WorkerPersistence { 042 043 private static final String createSequenceSQL = "create sequence timertasks_seq"; 044 private static final String createTableSQLWithSequence = "create table timertasks (id long primary key, serverid varchar(256) not null, timerkey varchar(256) not null, userid varchar(4096), userinfo varchar(4096), firsttime long not null, period long, atfixedrate boolean not null)"; 045 private static final String createTableSQLWithIdentity = "create table timertasks (id INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), serverid varchar(256) not null, timerkey varchar(256) not null, userid varchar(4096), userinfo varchar(4096), firsttime NUMERIC(18,0) not null, period NUMERIC(18, 0), atfixedrate CHAR(1))"; 046 private static final String sequenceSQL = "select timertasks_seq.nextval"; 047 private static final String identitySQL = "values IDENTITY_VAL_LOCAL()"; 048 private static final String insertSQLWithSequence = "insert into timertasks (id, serverid, timerkey, userid, userinfo, firsttime, period, atfixedrate) values (?, ?, ?, ?, ?, ?, ?, ?)"; 049 private static final String insertSQLWithIdentity = "insert into timertasks (serverid, timerkey, userid, userinfo, firsttime, period, atfixedrate) values (?, ?, ?, ?, ?, ?, ?)"; 050 private static final String deleteSQL = "delete from timertasks where id=?"; 051 private static final String selectSQL = "select id, userid, userinfo, firsttime, period, atfixedrate from timertasks where serverid = ? and timerkey=?"; 052 private static final String fixedRateUpdateSQL = "update timertasks set firsttime = firsttime + period where id = ?"; 053 private static final String intervalUpdateSQL = "update timertasks set firsttime = ? where id = ?"; 054 private static final String selectByKeySQL = "select id from timertasks where serverid = ? and timerkey = ? and (userid = ? or ? is null)"; 055 056 private final String serverUniqueId; 057 private final DataSource dataSource; 058 private boolean useSequence = false; 059 060 protected JDBCWorkerPersistence(String serverUniqueId, DataSource datasource, boolean useSequence) throws SQLException { 061 this.serverUniqueId = serverUniqueId; 062 this.dataSource = datasource; 063 this.useSequence = useSequence; 064 if (this.useSequence) { 065 execSQL(createSequenceSQL); 066 execSQL(createTableSQLWithSequence); 067 } else { 068 execSQL(createTableSQLWithIdentity); 069 } 070 } 071 072 073 public void save(WorkInfo workInfo) throws PersistenceException { 074 try { 075 Connection c = dataSource.getConnection(); 076 try { 077 if (useSequence) { 078 long id; 079 PreparedStatement seqStatement = c.prepareStatement(sequenceSQL); 080 try { 081 ResultSet seqRS = seqStatement.executeQuery(); 082 try { 083 seqRS.next(); 084 id = seqRS.getLong(1); 085 } finally { 086 seqRS.close(); 087 } 088 } finally { 089 seqStatement.close(); 090 } 091 workInfo.setId(id); 092 PreparedStatement insertStatement = c.prepareStatement(insertSQLWithSequence); 093 try { 094 String serializedUserId = serialize(workInfo.getUserId()); 095 String serializedUserKey = serialize(workInfo.getUserInfo()); 096 insertStatement.setLong(1, id); 097 insertStatement.setString(2, serverUniqueId); 098 insertStatement.setString(3, workInfo.getKey()); 099 insertStatement.setString(4, serializedUserId); 100 insertStatement.setString(5, serializedUserKey); 101 insertStatement.setLong(6, workInfo.getTime().getTime()); 102 if (workInfo.getPeriod() == null) { 103 insertStatement.setNull(7, Types.NUMERIC); 104 } else { 105 insertStatement.setLong(7, workInfo.getPeriod().longValue()); 106 } 107 insertStatement.setBoolean(8, workInfo.getAtFixedRate()); 108 int result = insertStatement.executeUpdate(); 109 if (result != 1) { 110 throw new PersistenceException("Could not insert!"); 111 } 112 } finally { 113 insertStatement.close(); 114 } 115 } else { 116 PreparedStatement insertStatement = c.prepareStatement(insertSQLWithIdentity); 117 try { 118 String serializedUserId = serialize(workInfo.getUserId()); 119 String serializedUserKey = serialize(workInfo.getUserInfo()); 120 insertStatement.setString(1, serverUniqueId); 121 insertStatement.setString(2, workInfo.getKey()); 122 insertStatement.setString(3, serializedUserId); 123 insertStatement.setString(4, serializedUserKey); 124 insertStatement.setLong(5, workInfo.getTime().getTime()); 125 if (workInfo.getPeriod() == null) { 126 insertStatement.setNull(6, Types.NUMERIC); 127 } else { 128 insertStatement.setLong(6, workInfo.getPeriod().longValue()); 129 } 130 insertStatement.setBoolean(7, workInfo.getAtFixedRate()); 131 int result = insertStatement.executeUpdate(); 132 if (result != 1) { 133 throw new PersistenceException("Could not insert!"); 134 } 135 } finally { 136 insertStatement.close(); 137 } 138 long id; 139 PreparedStatement identityStatement = c.prepareStatement(identitySQL); 140 try { 141 ResultSet seqRS = identityStatement.executeQuery(); 142 try { 143 seqRS.next(); 144 id = seqRS.getLong(1); 145 } finally { 146 seqRS.close(); 147 } 148 } finally { 149 identityStatement.close(); 150 } 151 workInfo.setId(id); 152 } 153 } finally { 154 c.close(); 155 } 156 } catch (SQLException e) { 157 throw new PersistenceException(e); 158 } 159 } 160 161 public void cancel(long id) throws PersistenceException { 162 try { 163 Connection c = dataSource.getConnection(); 164 try { 165 PreparedStatement deleteStatement = c.prepareStatement(deleteSQL); 166 try { 167 deleteStatement.setLong(1, id); 168 deleteStatement.execute(); 169 } finally { 170 deleteStatement.close(); 171 } 172 } finally { 173 c.close(); 174 } 175 } catch (SQLException e) { 176 throw new PersistenceException(e); 177 } 178 179 } 180 181 public void playback(String key, Playback playback) throws PersistenceException { 182 try { 183 Connection c = dataSource.getConnection(); 184 try { 185 PreparedStatement selectStatement = c.prepareStatement(selectSQL); 186 selectStatement.setString(1, serverUniqueId); 187 selectStatement.setString(2, key); 188 try { 189 ResultSet taskRS = selectStatement.executeQuery(); 190 try { 191 while (taskRS.next()) { 192 long id = taskRS.getLong(1); 193 String serizalizedUserId = taskRS.getString(2); 194 Object userId = deserialize(serizalizedUserId); 195 String serializedUserInfo = taskRS.getString(3); 196 Object userInfo = deserialize(serializedUserInfo); 197 long timeMillis = taskRS.getLong(4); 198 Date time = new Date(timeMillis); 199 Long period = null; 200 period = new Long(taskRS.getLong(5)); 201 if (!taskRS.wasNull()) { 202 period = null; 203 } 204 boolean atFixedRate = taskRS.getBoolean(6); 205 //TODO make sure the reference to this is ok, meaning we can't use a handle to this WorkerPersistence. 206 WorkInfo workInfo = new WorkInfo(key, userId, userInfo, time, period, atFixedRate); 207 workInfo.setId(id); 208 playback.schedule(workInfo); 209 } 210 } finally { 211 taskRS.close(); 212 } 213 } finally { 214 selectStatement.close(); 215 } 216 } finally { 217 c.close(); 218 } 219 } catch (SQLException e) { 220 throw new PersistenceException(e); 221 } 222 } 223 224 public void fixedRateWorkPerformed(long id) throws PersistenceException { 225 try { 226 Connection c = dataSource.getConnection(); 227 try { 228 PreparedStatement updateStatement = c.prepareStatement(fixedRateUpdateSQL); 229 try { 230 updateStatement.setLong(1, id); 231 updateStatement.execute(); 232 } finally { 233 updateStatement.close(); 234 } 235 } finally { 236 c.close(); 237 } 238 } catch (SQLException e) { 239 throw new PersistenceException(e); 240 } 241 } 242 243 public void intervalWorkPerformed(long id, long period) throws PersistenceException { 244 long next = System.currentTimeMillis() + period; 245 try { 246 Connection c = dataSource.getConnection(); 247 try { 248 PreparedStatement updateStatement = c.prepareStatement(intervalUpdateSQL); 249 try { 250 updateStatement.setLong(1, next); 251 updateStatement.setLong(2, id); 252 updateStatement.execute(); 253 } finally { 254 updateStatement.close(); 255 } 256 } finally { 257 c.close(); 258 } 259 } catch (SQLException e) { 260 throw new PersistenceException(e); 261 } 262 } 263 264 public Collection getIdsByKey(String key, Object userId) throws PersistenceException { 265 Collection ids = new ArrayList(); 266 try { 267 Connection c = dataSource.getConnection(); 268 try { 269 PreparedStatement selectStatement = c.prepareStatement(selectByKeySQL); 270 selectStatement.setString(1, serverUniqueId); 271 selectStatement.setString(2, key); 272 if (userId == null) { 273 selectStatement.setNull(3, Types.VARCHAR); 274 selectStatement.setNull(4, Types.VARCHAR); 275 } else { 276 String userIdString = serialize(userId); 277 selectStatement.setString(3, userIdString); 278 selectStatement.setString(4, userIdString); 279 } 280 try { 281 ResultSet taskRS = selectStatement.executeQuery(); 282 try { 283 while (taskRS.next()) { 284 long id = taskRS.getLong(1); 285 ids.add(new Long(id)); 286 } 287 } finally { 288 taskRS.close(); 289 } 290 } finally { 291 selectStatement.close(); 292 } 293 } finally { 294 c.close(); 295 } 296 } catch (SQLException e) { 297 throw new PersistenceException(e); 298 } 299 return ids; 300 } 301 302 private String serialize(Object task) { 303 XStream xStream = new XStream(); 304 return xStream.toXML(task); 305 } 306 307 private Object deserialize(String serializedRunnable) { 308 XStream xStream = new XStream(); 309 return xStream.fromXML(serializedRunnable); 310 } 311 312 private void execSQL(String sql) throws SQLException { 313 Connection c = dataSource.getConnection(); 314 try { 315 PreparedStatement updateStatement = c.prepareStatement(sql); 316 try { 317 updateStatement.execute(); 318 } catch (SQLException e) { 319 //ignore... table already exists. 320 } finally { 321 updateStatement.close(); 322 } 323 } finally { 324 c.close(); 325 } 326 } 327 328 }