001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018 package org.apache.geronimo.timer.jdbc; 019 020 import com.thoughtworks.xstream.XStream; 021 import org.apache.geronimo.timer.PersistenceException; 022 import org.apache.geronimo.timer.Playback; 023 import org.apache.geronimo.timer.WorkInfo; 024 import org.apache.geronimo.timer.WorkerPersistence; 025 026 import javax.sql.DataSource; 027 import java.sql.Connection; 028 import java.sql.PreparedStatement; 029 import java.sql.ResultSet; 030 import java.sql.SQLException; 031 import java.sql.Types; 032 import java.util.ArrayList; 033 import java.util.Collection; 034 import java.util.Date; 035 036 /** 037 * TODO use an insert returning or stored procedure to insert. 038 * 039 * @version $Rev: 476049 $ $Date: 2006-11-16 23:35:17 -0500 (Thu, 16 Nov 2006) $ 040 */ 041 public class JDBCWorkerPersistence implements WorkerPersistence { 042 043 private static final String createSequenceSQL = "create sequence timertasks_seq"; 044 private static final String createTableSQLWithSequence = "create table timertasks (id long primary key, serverid varchar(256) not null, timerkey varchar(256) not null, userid varchar(4096), userinfo varchar(4096), firsttime long not null, period long, atfixedrate boolean not null)"; 045 private static final String createTableSQLWithIdentity = "create table timertasks (id INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), serverid varchar(256) not null, timerkey varchar(256) not null, userid varchar(4096), userinfo varchar(4096), firsttime NUMERIC(18,0) not null, period NUMERIC(18, 0), atfixedrate CHAR(1))"; 046 private static final String sequenceSQL = "select timertasks_seq.nextval"; 047 private static final String identitySQL = "values IDENTITY_VAL_LOCAL()"; 048 private static final String insertSQLWithSequence = "insert into timertasks (id, serverid, timerkey, userid, userinfo, firsttime, period, atfixedrate) values (?, ?, ?, ?, ?, ?, ?, ?)"; 049 private static final String insertSQLWithIdentity = "insert into timertasks (serverid, timerkey, userid, userinfo, firsttime, period, atfixedrate) values (?, ?, ?, ?, ?, ?, ?)"; 050 private static final String deleteSQL = "delete from timertasks where id=?"; 051 private static final String selectSQL = "select id, userid, userinfo, firsttime, period, atfixedrate from timertasks where serverid = ? and timerkey=?"; 052 private static final String fixedRateUpdateSQL = "update timertasks set firsttime = firsttime + period where id = ?"; 053 private static final String intervalUpdateSQL = "update timertasks set firsttime = ? where id = ?"; 054 private static final String selectByKeySQL = "select id from timertasks where serverid = ? and timerkey = ? and (userid = ? or ? is null)"; 055 056 private final String serverUniqueId; 057 private final DataSource dataSource; 058 private boolean useSequence = false; 059 060 protected JDBCWorkerPersistence(String serverUniqueId, DataSource datasource, boolean useSequence) throws SQLException { 061 this.serverUniqueId = serverUniqueId; 062 this.dataSource = datasource; 063 this.useSequence = useSequence; 064 if (this.useSequence) { 065 execSQL(createSequenceSQL); 066 execSQL(createTableSQLWithSequence); 067 } else { 068 execSQL(createTableSQLWithIdentity); 069 } 070 } 071 072 073 public void save(WorkInfo workInfo) throws PersistenceException { 074 boolean threwException = false; 075 Connection c = getConnection(); 076 try { 077 if (useSequence) { 078 long id; 079 PreparedStatement seqStatement = c.prepareStatement(sequenceSQL); 080 try { 081 ResultSet seqRS = seqStatement.executeQuery(); 082 try { 083 seqRS.next(); 084 id = seqRS.getLong(1); 085 } finally { 086 seqRS.close(); 087 } 088 } finally { 089 seqStatement.close(); 090 } 091 workInfo.setId(id); 092 PreparedStatement insertStatement = c.prepareStatement(insertSQLWithSequence); 093 try { 094 String serializedUserId = serialize(workInfo.getUserId()); 095 String serializedUserKey = serialize(workInfo.getUserInfo()); 096 insertStatement.setLong(1, id); 097 insertStatement.setString(2, serverUniqueId); 098 insertStatement.setString(3, workInfo.getKey()); 099 insertStatement.setString(4, serializedUserId); 100 insertStatement.setString(5, serializedUserKey); 101 insertStatement.setLong(6, workInfo.getTime().getTime()); 102 if (workInfo.getPeriod() == null) { 103 insertStatement.setNull(7, Types.NUMERIC); 104 } else { 105 insertStatement.setLong(7, workInfo.getPeriod().longValue()); 106 } 107 insertStatement.setBoolean(8, workInfo.getAtFixedRate()); 108 int result = insertStatement.executeUpdate(); 109 if (result != 1) { 110 throw new PersistenceException("Could not insert!"); 111 } 112 } finally { 113 insertStatement.close(); 114 } 115 } else { 116 PreparedStatement insertStatement = c.prepareStatement(insertSQLWithIdentity); 117 try { 118 String serializedUserId = serialize(workInfo.getUserId()); 119 String serializedUserKey = serialize(workInfo.getUserInfo()); 120 insertStatement.setString(1, serverUniqueId); 121 insertStatement.setString(2, workInfo.getKey()); 122 insertStatement.setString(3, serializedUserId); 123 insertStatement.setString(4, serializedUserKey); 124 insertStatement.setLong(5, workInfo.getTime().getTime()); 125 if (workInfo.getPeriod() == null) { 126 insertStatement.setNull(6, Types.NUMERIC); 127 } else { 128 insertStatement.setLong(6, workInfo.getPeriod().longValue()); 129 } 130 insertStatement.setBoolean(7, workInfo.getAtFixedRate()); 131 int result = insertStatement.executeUpdate(); 132 if (result != 1) { 133 throw new PersistenceException("Could not insert!"); 134 } 135 } finally { 136 insertStatement.close(); 137 } 138 long id; 139 PreparedStatement identityStatement = c.prepareStatement(identitySQL); 140 try { 141 ResultSet seqRS = identityStatement.executeQuery(); 142 try { 143 seqRS.next(); 144 id = seqRS.getLong(1); 145 } finally { 146 seqRS.close(); 147 } 148 } finally { 149 identityStatement.close(); 150 } 151 workInfo.setId(id); 152 } 153 } catch (SQLException e) { 154 threwException = true; 155 throw new PersistenceException(e); 156 } finally { 157 close(c, !threwException); 158 } 159 } 160 161 public void cancel(long id) throws PersistenceException { 162 boolean threwException = false; 163 164 Connection c = getConnection(); 165 try { 166 PreparedStatement deleteStatement = c.prepareStatement(deleteSQL); 167 try { 168 deleteStatement.setLong(1, id); 169 deleteStatement.execute(); 170 } finally { 171 deleteStatement.close(); 172 } 173 } catch (SQLException e) { 174 threwException = true; 175 throw new PersistenceException(e); 176 } finally { 177 close(c, !threwException); 178 } 179 } 180 181 public void playback(String key, Playback playback) throws PersistenceException { 182 boolean threwException = false; 183 Connection c = getConnection(); 184 try { 185 PreparedStatement selectStatement = c.prepareStatement(selectSQL); 186 selectStatement.setString(1, serverUniqueId); 187 selectStatement.setString(2, key); 188 try { 189 ResultSet taskRS = selectStatement.executeQuery(); 190 try { 191 while (taskRS.next()) { 192 long id = taskRS.getLong(1); 193 String serizalizedUserId = taskRS.getString(2); 194 Object userId = deserialize(serizalizedUserId); 195 String serializedUserInfo = taskRS.getString(3); 196 Object userInfo = deserialize(serializedUserInfo); 197 long timeMillis = taskRS.getLong(4); 198 Date time = new Date(timeMillis); 199 Long period = null; 200 period = new Long(taskRS.getLong(5)); 201 if (!taskRS.wasNull()) { 202 period = null; 203 } 204 boolean atFixedRate = taskRS.getBoolean(6); 205 //TODO make sure the reference to this is ok, meaning we can't use a handle to this WorkerPersistence. 206 WorkInfo workInfo = new WorkInfo(key, userId, userInfo, time, period, atFixedRate); 207 workInfo.setId(id); 208 playback.schedule(workInfo); 209 } 210 } finally { 211 taskRS.close(); 212 } 213 } finally { 214 selectStatement.close(); 215 } 216 } catch (SQLException e) { 217 threwException = true; 218 throw new PersistenceException(e); 219 } finally { 220 close(c, !threwException); 221 } 222 } 223 224 public void fixedRateWorkPerformed(long id) throws PersistenceException { 225 boolean threwException = false; 226 Connection c = getConnection(); 227 try { 228 PreparedStatement updateStatement = c.prepareStatement(fixedRateUpdateSQL); 229 try { 230 updateStatement.setLong(1, id); 231 updateStatement.execute(); 232 } finally { 233 updateStatement.close(); 234 } 235 } catch (SQLException e) { 236 threwException = true; 237 throw new PersistenceException(e); 238 } finally { 239 close(c, !threwException); 240 } 241 } 242 243 public void intervalWorkPerformed(long id, long period) throws PersistenceException { 244 boolean threwException = false; 245 long next = System.currentTimeMillis() + period; 246 Connection c = getConnection(); 247 try { 248 PreparedStatement updateStatement = c.prepareStatement(intervalUpdateSQL); 249 try { 250 updateStatement.setLong(1, next); 251 updateStatement.setLong(2, id); 252 updateStatement.execute(); 253 } finally { 254 updateStatement.close(); 255 } 256 } catch (SQLException e) { 257 threwException = true; 258 throw new PersistenceException(e); 259 } finally { 260 close(c, !threwException); 261 } 262 } 263 264 public Collection getIdsByKey(String key, Object userId) throws PersistenceException { 265 boolean threwException = false; 266 267 Collection ids = new ArrayList(); 268 Connection c = getConnection(); 269 try { 270 PreparedStatement selectStatement = c.prepareStatement(selectByKeySQL); 271 selectStatement.setString(1, serverUniqueId); 272 selectStatement.setString(2, key); 273 if (userId == null) { 274 selectStatement.setNull(3, Types.VARCHAR); 275 selectStatement.setNull(4, Types.VARCHAR); 276 } else { 277 String userIdString = serialize(userId); 278 selectStatement.setString(3, userIdString); 279 selectStatement.setString(4, userIdString); 280 } 281 try { 282 ResultSet taskRS = selectStatement.executeQuery(); 283 try { 284 while (taskRS.next()) { 285 long id = taskRS.getLong(1); 286 ids.add(new Long(id)); 287 } 288 } finally { 289 taskRS.close(); 290 } 291 } finally { 292 selectStatement.close(); 293 } 294 } catch (SQLException e) { 295 threwException = true; 296 throw new PersistenceException(e); 297 } finally { 298 close(c, !threwException); 299 } 300 301 return ids; 302 } 303 304 private String serialize(Object task) { 305 XStream xStream = new XStream(); 306 return xStream.toXML(task); 307 } 308 309 private Object deserialize(String serializedRunnable) { 310 XStream xStream = new XStream(); 311 return xStream.fromXML(serializedRunnable); 312 } 313 314 private void execSQL(String sql) throws SQLException { 315 Connection c = dataSource.getConnection(); 316 try { 317 PreparedStatement updateStatement = c.prepareStatement(sql); 318 try { 319 updateStatement.execute(); 320 } catch (SQLException e) { 321 //ignore... table already exists. 322 } finally { 323 updateStatement.close(); 324 } 325 } finally { 326 c.close(); 327 } 328 } 329 330 private Connection getConnection() throws PersistenceException { 331 try { 332 return dataSource.getConnection(); 333 } catch (Exception e) { 334 throw new PersistenceException(e); 335 } 336 } 337 338 private void close(Connection c, boolean reportSqlException) throws PersistenceException { 339 try { 340 c.close(); 341 } catch (Exception e) { 342 if (!reportSqlException) { 343 throw new PersistenceException(e); 344 } 345 } 346 } 347 }