Ticket #185: Upload_with_status_queue.patch
File Upload_with_status_queue.patch, 9.4 KB (added by dkocher, on Nov 11, 2010 at 3:31:55 PM) |
---|
-
source/ch/ethz/ssh2/sftp/SFTPv3Client.java
1092 1092 * A read is divided into multiple requests sent sequentially before 1093 1093 * reading any status from the server 1094 1094 */ 1095 private static class OutstandingRe quest {1095 private static class OutstandingReadRequest { 1096 1096 int req_id; 1097 1097 /** 1098 1098 * Read offset to request on server starting at the file offset for the first request. … … 1135 1135 /** 1136 1136 * @param parallelism 1137 1137 */ 1138 public void set DownloadRequestParallelism(int parallelism) {1138 public void setRequestParallelism(int parallelism) { 1139 1139 this.parallelism = Math.min(parallelism, DEFAULT_MAX_PARALLELISM); 1140 1140 log.log("setDownloadRequestParallelism:" + this.parallelism); 1141 1141 } … … 1143 1143 /** 1144 1144 * Mapping request ID to request. 1145 1145 */ 1146 Map<Integer, OutstandingRe quest> pendingQueue1147 = new HashMap<Integer, OutstandingRe quest>();1146 Map<Integer, OutstandingReadRequest> pendingReadQueue 1147 = new HashMap<Integer, OutstandingReadRequest>(); 1148 1148 1149 1149 /** 1150 1150 * Read bytes from a file in a parallel fashion. As many bytes as you want will be read. … … 1177 1177 int clientOffset = dstoff; 1178 1178 1179 1179 long serverOffset = fileOffset; 1180 for(OutstandingRe quest r : pendingQueue.values()) {1180 for(OutstandingReadRequest r : pendingReadQueue.values()) { 1181 1181 // Server offset should take pending requests into account. 1182 1182 serverOffset += r.len; 1183 1183 } 1184 1184 1185 1185 while(true) { 1186 / * If there was an error and no outstanding request - stop */1187 if((pending Queue.size() == 0) && errorOccured) {1186 // Stop if there was an error and no outstanding request 1187 if((pendingReadQueue.size() == 0) && errorOccured) { 1188 1188 break; 1189 1189 } 1190 1190 1191 / * Send as many requests as we are allowed to */1192 while(pending Queue.size() < parallelism) {1191 // Send as many requests as we are allowed to 1192 while(pendingReadQueue.size() < parallelism) { 1193 1193 if(errorOccured) { 1194 1194 break; 1195 1195 } 1196 / * Send the next request */1197 OutstandingRe quest req = new OutstandingRequest();1196 // Send the next read request 1197 OutstandingReadRequest req = new OutstandingReadRequest(); 1198 1198 req.req_id = generateNextRequestID(); 1199 1199 req.serverOffset = serverOffset; 1200 1200 req.len = (remaining > len) ? len : remaining; … … 1207 1207 1208 1208 sendReadRequest(req.req_id, handle, req.serverOffset, req.len); 1209 1209 1210 pending Queue.put(req.req_id, req);1210 pendingReadQueue.put(req.req_id, req); 1211 1211 } 1212 1213 /* Are we done? */ 1214 if(pendingQueue.size() == 0) { 1212 if(pendingReadQueue.size() == 0) { 1215 1213 break; 1216 1214 } 1217 1215 1218 / * No, receive a single answer */1216 // Receive a single answer 1219 1217 byte[] resp = receiveMessage(34000); 1220 1221 1218 TypesReader tr = new TypesReader(resp); 1222 1223 1219 int type = tr.readByte(); 1224 int rep_id = tr.readUINT32(); 1225 1226 /* Search the pending queue */ 1227 OutstandingRequest req = pendingQueue.get(rep_id); 1228 1229 /* Should shutdown here, no point in going on */ 1220 // Search the pending queue 1221 OutstandingReadRequest req = pendingReadQueue.remove(tr.readUINT32()); 1230 1222 if(null == req) { 1231 1223 throw new IOException("The server sent an invalid id field."); 1232 1224 } 1233 1234 pendingQueue.remove(rep_id); 1235 1236 /* Evaluate the answer */ 1225 // Evaluate the answer 1237 1226 if(type == Packet.SSH_FXP_STATUS) { 1238 1227 /* In any case, stop sending more packets */ 1239 1228 … … 1246 1235 } 1247 1236 // Flag to read all pending requests but don't send any more. 1248 1237 errorOccured = true; 1249 if(pending Queue.isEmpty()) {1238 if(pendingReadQueue.isEmpty()) { 1250 1239 if(ErrorCodes.SSH_FX_EOF == code) { 1251 1240 return -1; 1252 1241 } … … 1254 1243 } 1255 1244 } 1256 1245 else if(type == Packet.SSH_FXP_DATA) { 1257 / * OK, collect data */1246 // OK, collect data 1258 1247 int readLen = tr.readUINT32(); 1259 1248 1260 1249 if((readLen < 0) || (readLen > req.len)) { … … 1278 1267 log.log("Requesting again: " + req.serverOffset + "/" + req.len); 1279 1268 sendReadRequest(req.req_id, handle, req.serverOffset, req.len); 1280 1269 1281 pending Queue.put(req.req_id, req);1270 pendingReadQueue.put(req.req_id, req); 1282 1271 } 1283 1272 return readLen; 1284 1273 } … … 1291 1280 } 1292 1281 1293 1282 /** 1283 * A read is divided into multiple requests sent sequentially before 1284 * reading any status from the server 1285 */ 1286 private static class OutstandingStatusRequest { 1287 int req_id; 1288 } 1289 1290 /** 1291 * Mapping request ID to request. 1292 */ 1293 Map<Integer, OutstandingStatusRequest> pendingStatusQueue 1294 = new HashMap<Integer, OutstandingStatusRequest>(); 1295 1296 /** 1294 1297 * Write bytes to a file. If <code>len</code> > 32768, then the write operation will 1295 1298 * be split into multiple writes. 1296 1299 * … … 1304 1307 public void upload(SFTPv3FileHandle handle, long fileOffset, byte[] src, int srcoff, int len) throws IOException { 1305 1308 checkHandleValidAndOpen(handle); 1306 1309 1307 while(len > 0) { 1308 int writeRequestLen = len; 1310 // Send the next write request 1311 OutstandingStatusRequest req = new OutstandingStatusRequest(); 1312 req.req_id = generateNextRequestID(); 1309 1313 1310 if(writeRequestLen > 32768) { 1311 writeRequestLen = 32768; 1312 } 1313 1314 int req_id = generateNextRequestID(); 1315 1316 TypesWriter tw = new TypesWriter(); 1317 tw.writeString(handle.fileHandle, 0, handle.fileHandle.length); 1318 tw.writeUINT64(fileOffset); 1319 tw.writeString(src, srcoff, writeRequestLen); 1314 TypesWriter tw = new TypesWriter(); 1315 tw.writeString(handle.fileHandle, 0, handle.fileHandle.length); 1316 tw.writeUINT64(fileOffset); 1317 tw.writeString(src, srcoff, len); 1320 1318 1321 1322 sendMessage(Packet.SSH_FXP_WRITE,req_id, tw.getBytes());1319 log.log("Sending SSH_FXP_WRITE..."); 1320 sendMessage(Packet.SSH_FXP_WRITE, req.req_id, tw.getBytes()); 1323 1321 1324 fileOffset += writeRequestLen;1322 pendingStatusQueue.put(req.req_id, req); 1325 1323 1326 srcoff += writeRequestLen; 1327 len -= writeRequestLen; 1324 // Only read next status if parallelism reached 1325 while(pendingStatusQueue.size() >= parallelism) { 1326 this.readStatus(); 1327 } 1328 } 1328 1329 1329 byte[] resp = receiveMessage(34000); 1330 private void readStatus() throws IOException { 1331 byte[] resp = receiveMessage(34000); 1330 1332 1331 TypesReader tr = new TypesReader(resp); 1333 TypesReader tr = new TypesReader(resp); 1334 int type = tr.readByte(); 1332 1335 1333 int t = tr.readByte(); 1334 1335 int rep_id = tr.readUINT32(); 1336 if(rep_id != req_id) { 1337 throw new IOException("The server sent an invalid id field."); 1338 } 1336 // Search the pending queue 1337 OutstandingStatusRequest status = pendingStatusQueue.remove(tr.readUINT32()); 1338 if(null == status) { 1339 throw new IOException("The server sent an invalid id field."); 1340 } 1339 1341 1340 if(t != Packet.SSH_FXP_STATUS) { 1341 throw new IOException("The SFTP server sent an unexpected packet type (" + t + ")"); 1342 // Evaluate the answer 1343 if(type == Packet.SSH_FXP_STATUS) { 1344 // In any case, stop sending more packets 1345 int code = tr.readUINT32(); 1346 if(log.isEnabled()) { 1347 String[] desc = ErrorCodes.getDescription(code); 1348 log.log("Got SSH_FXP_STATUS (" + status.req_id + ") (" + ((desc != null) ? desc[0] : "UNKNOWN") + ")"); 1342 1349 } 1343 1344 int errorCode = tr.readUINT32(); 1345 1346 if(errorCode == ErrorCodes.SSH_FX_OK) { 1347 continue; 1350 if(code == ErrorCodes.SSH_FX_OK) { 1351 return; 1348 1352 } 1349 1350 String errorMessage = tr.readString(); 1351 1352 throw new SFTPException(errorMessage, errorCode); 1353 String msg = tr.readString(); 1354 throw new SFTPException(msg, code); 1353 1355 } 1356 throw new IOException("The SFTP server sent an unexpected packet type (" + type + ")"); 1354 1357 } 1355 1358 1356 1359 /** … … 1360 1363 * @throws IOException 1361 1364 */ 1362 1365 public void closeFile(SFTPv3FileHandle handle) throws IOException { 1363 if(handle == null) {1364 throw new IllegalArgumentException("the handle argument may not be null");1365 }1366 1366 try { 1367 while(!pendingStatusQueue.isEmpty()) { 1368 this.readStatus(); 1369 } 1367 1370 if(!handle.isClosed) { 1368 1371 closeHandle(handle.fileHandle); 1369 1372 }