1 /**
2 *
3 * Copyright 2004 The Apache Software Foundation
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.geronimo.timer.jdbc;
19
20 import java.sql.Connection;
21 import java.sql.PreparedStatement;
22 import java.sql.ResultSet;
23 import java.sql.SQLException;
24 import java.sql.Types;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.Date;
28 import javax.sql.DataSource;
29
30 import com.thoughtworks.xstream.XStream;
31 import org.apache.geronimo.timer.PersistenceException;
32 import org.apache.geronimo.timer.Playback;
33 import org.apache.geronimo.timer.WorkInfo;
34 import org.apache.geronimo.timer.WorkerPersistence;
35
36 /**
37 * TODO use an insert returning or stored procedure to insert.
38 *
39 * @version $Rev: 355877 $ $Date: 2005-12-10 18:48:27 -0800 (Sat, 10 Dec 2005) $
40 */
41 public class JDBCWorkerPersistence implements WorkerPersistence {
42
43 private static final String createSequenceSQL = "create sequence timertasks_seq";
44 private static final String createTableSQLWithSequence = "create table timertasks (id long primary key, serverid varchar(256) not null, timerkey varchar(256) not null, userid varchar(4096), userinfo varchar(4096), firsttime long not null, period long, atfixedrate boolean not null)";
45 private static final String createTableSQLWithIdentity = "create table timertasks (id INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), serverid varchar(256) not null, timerkey varchar(256) not null, userid varchar(4096), userinfo varchar(4096), firsttime NUMERIC(18,0) not null, period NUMERIC(18, 0), atfixedrate CHAR(1))";
46 private static final String sequenceSQL = "select timertasks_seq.nextval";
47 private static final String identitySQL = "values IDENTITY_VAL_LOCAL()";
48 private static final String insertSQLWithSequence = "insert into timertasks (id, serverid, timerkey, userid, userinfo, firsttime, period, atfixedrate) values (?, ?, ?, ?, ?, ?, ?, ?)";
49 private static final String insertSQLWithIdentity = "insert into timertasks (serverid, timerkey, userid, userinfo, firsttime, period, atfixedrate) values (?, ?, ?, ?, ?, ?, ?)";
50 private static final String deleteSQL = "delete from timertasks where id=?";
51 private static final String selectSQL = "select id, userid, userinfo, firsttime, period, atfixedrate from timertasks where serverid = ? and timerkey=?";
52 private static final String fixedRateUpdateSQL = "update timertasks set firsttime = firsttime + period where id = ?";
53 private static final String intervalUpdateSQL = "update timertasks set firsttime = ? where id = ?";
54 private static final String selectByKeySQL = "select id from timertasks where serverid = ? and timerkey = ? and (userid = ? or ? is null)";
55
56 private final String serverUniqueId;
57 private final DataSource dataSource;
58 private boolean useSequence = false;
59
60 protected JDBCWorkerPersistence(String serverUniqueId, DataSource datasource, boolean useSequence) throws SQLException {
61 this.serverUniqueId = serverUniqueId;
62 this.dataSource = datasource;
63 this.useSequence = useSequence;
64 if (this.useSequence) {
65 execSQL(createSequenceSQL);
66 execSQL(createTableSQLWithSequence);
67 } else {
68 execSQL(createTableSQLWithIdentity);
69 }
70 }
71
72
73 public void save(WorkInfo workInfo) throws PersistenceException {
74 try {
75 Connection c = dataSource.getConnection();
76 try {
77 if (useSequence) {
78 long id;
79 PreparedStatement seqStatement = c.prepareStatement(sequenceSQL);
80 try {
81 ResultSet seqRS = seqStatement.executeQuery();
82 try {
83 seqRS.next();
84 id = seqRS.getLong(1);
85 } finally {
86 seqRS.close();
87 }
88 } finally {
89 seqStatement.close();
90 }
91 workInfo.setId(id);
92 PreparedStatement insertStatement = c.prepareStatement(insertSQLWithSequence);
93 try {
94 String serializedUserId = serialize(workInfo.getUserId());
95 String serializedUserKey = serialize(workInfo.getUserInfo());
96 insertStatement.setLong(1, id);
97 insertStatement.setString(2, serverUniqueId);
98 insertStatement.setString(3, workInfo.getKey());
99 insertStatement.setString(4, serializedUserId);
100 insertStatement.setString(5, serializedUserKey);
101 insertStatement.setLong(6, workInfo.getTime().getTime());
102 if (workInfo.getPeriod() == null) {
103 insertStatement.setNull(7, Types.NUMERIC);
104 } else {
105 insertStatement.setLong(7, workInfo.getPeriod().longValue());
106 }
107 insertStatement.setBoolean(8, workInfo.getAtFixedRate());
108 int result = insertStatement.executeUpdate();
109 if (result != 1) {
110 throw new PersistenceException("Could not insert!");
111 }
112 } finally {
113 insertStatement.close();
114 }
115 } else {
116 PreparedStatement insertStatement = c.prepareStatement(insertSQLWithIdentity);
117 try {
118 String serializedUserId = serialize(workInfo.getUserId());
119 String serializedUserKey = serialize(workInfo.getUserInfo());
120 insertStatement.setString(1, serverUniqueId);
121 insertStatement.setString(2, workInfo.getKey());
122 insertStatement.setString(3, serializedUserId);
123 insertStatement.setString(4, serializedUserKey);
124 insertStatement.setLong(5, workInfo.getTime().getTime());
125 if (workInfo.getPeriod() == null) {
126 insertStatement.setNull(6, Types.NUMERIC);
127 } else {
128 insertStatement.setLong(6, workInfo.getPeriod().longValue());
129 }
130 insertStatement.setBoolean(7, workInfo.getAtFixedRate());
131 int result = insertStatement.executeUpdate();
132 if (result != 1) {
133 throw new PersistenceException("Could not insert!");
134 }
135 } finally {
136 insertStatement.close();
137 }
138 long id;
139 PreparedStatement identityStatement = c.prepareStatement(identitySQL);
140 try {
141 ResultSet seqRS = identityStatement.executeQuery();
142 try {
143 seqRS.next();
144 id = seqRS.getLong(1);
145 } finally {
146 seqRS.close();
147 }
148 } finally {
149 identityStatement.close();
150 }
151 workInfo.setId(id);
152 }
153 } finally {
154 c.close();
155 }
156 } catch (SQLException e) {
157 throw new PersistenceException(e);
158 }
159 }
160
161 public void cancel(long id) throws PersistenceException {
162 try {
163 Connection c = dataSource.getConnection();
164 try {
165 PreparedStatement deleteStatement = c.prepareStatement(deleteSQL);
166 try {
167 deleteStatement.setLong(1, id);
168 deleteStatement.execute();
169 } finally {
170 deleteStatement.close();
171 }
172 } finally {
173 c.close();
174 }
175 } catch (SQLException e) {
176 throw new PersistenceException(e);
177 }
178
179 }
180
181 public void playback(String key, Playback playback) throws PersistenceException {
182 try {
183 Connection c = dataSource.getConnection();
184 try {
185 PreparedStatement selectStatement = c.prepareStatement(selectSQL);
186 selectStatement.setString(1, serverUniqueId);
187 selectStatement.setString(2, key);
188 try {
189 ResultSet taskRS = selectStatement.executeQuery();
190 try {
191 while (taskRS.next()) {
192 long id = taskRS.getLong(1);
193 String serizalizedUserId = taskRS.getString(2);
194 Object userId = deserialize(serizalizedUserId);
195 String serializedUserInfo = taskRS.getString(3);
196 Object userInfo = deserialize(serializedUserInfo);
197 long timeMillis = taskRS.getLong(4);
198 Date time = new Date(timeMillis);
199 Long period = null;
200 period = new Long(taskRS.getLong(5));
201 if (!taskRS.wasNull()) {
202 period = null;
203 }
204 boolean atFixedRate = taskRS.getBoolean(6);
205
206 WorkInfo workInfo = new WorkInfo(key, userId, userInfo, time, period, atFixedRate);
207 workInfo.setId(id);
208 playback.schedule(workInfo);
209 }
210 } finally {
211 taskRS.close();
212 }
213 } finally {
214 selectStatement.close();
215 }
216 } finally {
217 c.close();
218 }
219 } catch (SQLException e) {
220 throw new PersistenceException(e);
221 }
222 }
223
224 public void fixedRateWorkPerformed(long id) throws PersistenceException {
225 try {
226 Connection c = dataSource.getConnection();
227 try {
228 PreparedStatement updateStatement = c.prepareStatement(fixedRateUpdateSQL);
229 try {
230 updateStatement.setLong(1, id);
231 updateStatement.execute();
232 } finally {
233 updateStatement.close();
234 }
235 } finally {
236 c.close();
237 }
238 } catch (SQLException e) {
239 throw new PersistenceException(e);
240 }
241 }
242
243 public void intervalWorkPerformed(long id, long period) throws PersistenceException {
244 long next = System.currentTimeMillis() + period;
245 try {
246 Connection c = dataSource.getConnection();
247 try {
248 PreparedStatement updateStatement = c.prepareStatement(intervalUpdateSQL);
249 try {
250 updateStatement.setLong(1, next);
251 updateStatement.setLong(2, id);
252 updateStatement.execute();
253 } finally {
254 updateStatement.close();
255 }
256 } finally {
257 c.close();
258 }
259 } catch (SQLException e) {
260 throw new PersistenceException(e);
261 }
262 }
263
264 public Collection getIdsByKey(String key, Object userId) throws PersistenceException {
265 Collection ids = new ArrayList();
266 try {
267 Connection c = dataSource.getConnection();
268 try {
269 PreparedStatement selectStatement = c.prepareStatement(selectByKeySQL);
270 selectStatement.setString(1, serverUniqueId);
271 selectStatement.setString(2, key);
272 if (userId == null) {
273 selectStatement.setNull(3, Types.VARCHAR);
274 selectStatement.setNull(4, Types.VARCHAR);
275 } else {
276 String userIdString = serialize(userId);
277 selectStatement.setString(3, userIdString);
278 selectStatement.setString(4, userIdString);
279 }
280 try {
281 ResultSet taskRS = selectStatement.executeQuery();
282 try {
283 while (taskRS.next()) {
284 long id = taskRS.getLong(1);
285 ids.add(new Long(id));
286 }
287 } finally {
288 taskRS.close();
289 }
290 } finally {
291 selectStatement.close();
292 }
293 } finally {
294 c.close();
295 }
296 } catch (SQLException e) {
297 throw new PersistenceException(e);
298 }
299 return ids;
300 }
301
302 private String serialize(Object task) {
303 XStream xStream = new XStream();
304 return xStream.toXML(task);
305 }
306
307 private Object deserialize(String serializedRunnable) {
308 XStream xStream = new XStream();
309 return xStream.fromXML(serializedRunnable);
310 }
311
312 private void execSQL(String sql) throws SQLException {
313 Connection c = dataSource.getConnection();
314 try {
315 PreparedStatement updateStatement = c.prepareStatement(sql);
316 try {
317 updateStatement.execute();
318 } catch (SQLException e) {
319
320 } finally {
321 updateStatement.close();
322 }
323 } finally {
324 c.close();
325 }
326 }
327
328 }