diff --git a/pymods/irods_utils/irods_utils.py b/pymods/irods_utils/irods_utils.py index c12ae9142eaca0c5e58d2c8e70ba8b6d406b1140..429282286246a9b340347b9d39cce463ff6509a6 100644 --- a/pymods/irods_utils/irods_utils.py +++ b/pymods/irods_utils/irods_utils.py @@ -236,189 +236,255 @@ def pydict_to_irods_avus( coll_or_obj, metadata_dict ): return True -# Returns None or the data_object irods ref. -def confirmed_put( irods_sesh, file, new_obj_ipath, metadata_dict=None, datatype=None, block_size=(2**24) ): - logging.debug( "file: " + file - + " new_obj_ipath: " + new_obj_ipath - + " datatype: " + datatype - + " block_size: " + str( block_size ) ) +import irods +import os +import asyncio - if irods_sesh == None: - logging.error( "irods_sesh == None" ) - return None +# In line async fuction to read data blocks +async def async_read_block(file_handle, block_size): + while True: + # Read a block of data from the file asynchronously in a separate thread + block = await asyncio.to_thread(file_handle.read, block_size) + if not block: + # End of file reached, break out of loop + break + # Return the block of data to the event loop as a coroutine + yield block - # We want a hash of our file. Should put this off in a separate thread? - try: - sha = hashlib.sha512() - with open(file, 'rb') as f: - for chunk in iter(lambda: f.read(block_size), b''): - sha.update(chunk) +async def async_streaming_transfer_file_to_object( args ): + + file_pathname = args[ 0 ] + irods_obj = args[ 1 ] + block_size = args[ 2 ] + + rc = 0 + transfer_size = 0 + + # Initialize the checksum calculator + hasher256 = hashlib.sha256() + hasher512 = hashlib.sha512() - new_obj_hash = base64.urlsafe_b64encode( sha.digest() ).rstrip(b'=').decode() + try: + while True: + # Read a block of data from the file asynchronously + block = await read_block(file_pathname, block_size) + if not block: + # End of file reached, break out of loop + break + transfer_size += len(block) + # Write the block of data to the temporary object in iRODS + tmp_obj.write(block) + # Flush the buffer to ensure the block is written to iRODS immediately + tmp_obj.flush() + # Update the checksum with the block of data + hasher256.update(block) + hasher512.update(block) except Exception as ex: - logging.error( "Failed to create hash for new object." - + " file: " + file - + " ex: " + str(ex) ) - return None + logging.error("Failed to transfer data from file to irods. ipath: " + new_obj_ipath + " ex: " + str(ex) ) + #return None + rc = -1 - if metadata_dict == None: - metadata_dict = {} + return rc, hasher256, hasher512, transfer_size - # Put the hash and datatype into the dict - metadata_dict[ "SHA512SUMB64" ] = new_obj_hash +def streaming_transfer_file_to_object( args ): - if datatype == None: - datatype = "UNKNOWN" + file_pathname = args[ 0 ] + irods_sesh = args[ 1 ] + irods_ipath = args[ 2 ] + block_size = args[ 3 ] - metadata_dict[ "DataType" ] = datatype + rc = 0 + transfer_size = 0 - # Do a get to make sure this object does not exsist. (if it does, we will check the checksum, etc) - new_obj_exists_flag = False - try: - options = {kw.VERIFY_CHKSUM_KW: ''} - new_obj = irods_sesh.data_objects.get( new_obj_ipath, **options ) - new_obj_exists_flag = True - except Exception as ex: - logging.debug( "Failed to get new object. (OK because we expect to create it.)" - + " ipath: " + new_obj_ipath - + " ex: " + str(ex) ) + # Initialize the checksum calculator + hasher256 = hashlib.sha256() + hasher512 = hashlib.sha512() - # X If the object exists, check that it matches what we would upload. - # X This should not happen under normal conditions. - # X If this block can not match file and metadata, we go to the error path. - # Move object to a new name with a time stamp - if new_obj_exists_flag: + target_obj = irods_sesh.data_objects.get(irods_ipath) - logging.error( "new object already exists (NOT OK because we expected to create it.)" - + " file " + file - + " ipath: " + new_obj_ipath ) + try: + # irods_obj_handle = irods_sesh.data_objects.open( irods_ipath, mode='w', checksum=True) + irods_obj_handle = target_obj.open('w', checksum=True) + except Exception as ex: + logging.error("Failed open irods object for transfer. file: " + file_pathname + " to irods ipath: " + irods_ipath + " ex: " + str(ex) ) + return -1, hasher256, hasher512, 0 - # https://github.com/irods/python-irodsclient/blob/main/irods/test/collection_test.py#L317 - digest = helpers.compute_sha256_digest( file ) + try: + with open( file_pathname, 'rb' ) as input_file: + while True: + # Read a block of data from the file asynchronously + block = input_file.read( block_size ) + if not block: + # End of file reached, break out of loop + break + transfer_size += len(block) + # Write the block of data to the object in iRODS + irods_obj_handle.write(block) + # Flush the buffer to ensure the block is written to iRODS immediately + irods_obj_handle.flush() + # Update the checksum with the block of data + hasher256.update(block) + hasher512.update(block) + except Exception as ex: + logging.error("Failed to transfer data from file: " + file_pathname + " to irods ipath: " + irods_obj.path + " ex: " + str(ex) ) + #return None + rc = -1 - if new_obj.checksum != "sha2:{}".format(digest) : - logging.error( "sha256 mismatch on local object vs object found in iRODS." - + " file " + file - + " new_obj_ipath " + new_obj_ipath ) + irods_obj_handle.close() - # This means we are really trying to upload a different data object to the same archive ipath - # Move the old object out of the way and date it. - try: - irods_sesh.data_objects.move( new_obj.path, - new_obj.path + "_" + datetime.datetime.now().strftime("%Y-%m-%d_%H:%M:%S.%fZ%z") ) - except Exception as ex: - logging.error( "Failed to move an existing object out of the way so can not upload current version." - + " path " + new_obj.path - + " ex: " + str( ex ) ) - return None + hash_digest256 = hasher256.digest() + hash_digest512 = hasher512.digest() - # From here, we will try to upload a new object to replace the one moved aside + b64_digest = base64.b64encode(hash_digest256).decode() + new_obj_hash_256 = "sha2:{}".format(b64_digest) + logging.error( "new_obj_hash_256: " + new_obj_hash_256 ) - else: # The object exists and data sha matches, now check AVUs + return rc, hash_digest256, hash_digest512, transfer_size - # Get the existing objects metadata as a python dict - existing_obj_dict = irods_avus_to_pydict( new_obj ) - - if "SHA512SUMB64" in existing_obj_dict: - if metadata_dict[ "SHA512SUMB64" ] != existing_obj_dict[ "SHA512SUMB64" ] : - logging.error( "WEIRD ERROR: iRODS sha226 sum matched but MrData sha512 AVU mismatched." - + ' metadata_dict[ "SHA512SUMB64" ] ' + metadata_dict[ "SHA512SUMB64" ] - + ' existing_obj_dict[ "SHA512SUMB64" ] ' + existing_obj_dict[ "SHA512SUMB64" ] ) - else: - logging.error( "WEIRD ERROR: iRODS sha226 sum but object has no MrData SHA512SUMB64 AVU." - + ' metadata_dict[ "SHA512SUMB64" ] ' + metadata_dict[ "SHA512SUMB64" ] ) - - if existing_obj_dict == None: - logging.error( "Object exists but fetching avus as py dict failed." - + " ipath: " + new_obj_ipath ) - - # Ensure metadata matches. When it does, call it successful dupe and move on. - # Note: object must have all candidate AVUs but it may have additional AVUs. - mismatch = False - for key, value in metadata_dict.items(): - if key not in existing_obj_dict or existing_obj_dict[ key ] != value: - if key in existing_obj_dict: - dup_key_value = str( existing_obj_dict[ key ] ) + ". " - metadata_dict[ key ] = dup_key_value # add the existing key/value so to end up with the union of AVUs - else: - dup_key_value = "not defined in existing_obj_dict. " - logging.info( "Duplicated object metadata field does not match." - + " existing_obj_dict key: " + str( key ) + " value: " + dup_key_value - + " mismatch value: " + str( value ) - + " file: " + file ) - - mismatch = True - - if mismatch: - logging.info( "Metadata mismatches. Will add missing AVUs." - + " file: " + file ) - try: - pydict_to_irods_avus( new_obj, metadata_dict ) - except Exception as ex: - logging.error("Failed to create AVUS. ipath: " + new_obj_ipath + " ex: " + str(ex) ) - logging.error("metadata AVU dict >" + str( metadata_dict ) + "<" ) - return None - return new_obj - - logging.warn( "Candidate file matches existing data object." - + " ipath: " + new_obj_ipath ) - return new_obj # will not upload since the object and metadata should now match. +# Returns None or the data_object irods ref. +def confirmed_put( irods_sesh, file_pathname, new_obj_ipath, metadata_dict=None, datatype=None, block_size=(2**28) ): + logging.debug( "file_pathname: " + file_pathname + + " new_obj_ipath: " + new_obj_ipath + + " datatype: " + datatype + + " block_size: " + str( block_size ) ) - # !! This code had a problem with a network error leaving table row for object - # !! with data_is_dirty set wrong. Table r_data_main - ## Expect to be able to put now. - ## The file is new and will be uploaded here along with metadata. - #new_obj_ipath_tmp = new_obj_ipath + "_tmp" - # - ## Check if the temp obj exists - #try: - # options = {} # shouldn't care about the checksum {kw.VERIFY_CHKSUM_KW: ''} - # new_obj = irods_sesh.data_objects.get( new_obj_ipath_tmp, **options ) - # # When there has not been an exception, delete the old tmp object - # irods_sesh.data_objects.unlink( new_obj_ipath_tmp, force = True ) - # logging.error( "new_obj tmp file existed and has been deleted. This means we've tried to upload here before and failed." - # + " new_obj_ipath_tmp " + new_obj_ipath_tmp ) - #except Exception as ex: - # logging.debug( "Failed to get new_obj_ipath_tmp. (OK because we expect to create it.)" - # + " tmp ipath: " + new_obj_ipath_tmp - # + " ex: " + str(ex) ) - # + # NOTE: this routine uploades the file as a temporary iRODS object. + # Common sense suggests checking if the objct already exsists, a rare condition first. + # The trouble is, that requires checksumming the new object before uploading and + # for very large objects (many GB) this checksum results in pulling the file from + # a network file system twice due to blowing cache. + # So, we upload to temp, checkusmming along the way, and then check for a pre-existing irods object. + + if irods_sesh == None: + logging.error( "irods_sesh == None" ) + return None # Make a timestamped partial tmp file name which, if the upload somehow fails here, will be left behind # Hopefully this does not happen much, but we have seen at least on case so far. # Cleanup will need to be done elsewhere. new_obj_ipath_tmp = new_obj_ipath + ".part." + datetime.datetime.now().strftime("%Y-%m-%d_%H:%M:%S.%fZ%z") - logging.debug("Uploading file: " + file + " to tmp ipath: " + new_obj_ipath_tmp ) + logging.debug("Uploading file: " + file_pathname + " to tmp ipath: " + new_obj_ipath_tmp ) + try: - options = {kw.REG_CHKSUM_KW: '', kw.ALL_KW: ''} - irods_sesh.data_objects.put( file, new_obj_ipath_tmp, **options ) + tmp_obj = irods_sesh.data_objects.create( new_obj_ipath_tmp ) except Exception as axe: - logging.error( "Falied to put data object." - + " file: " + file - + " tmp ipath: " + new_obj_ipath_tmp + logging.error( "Falied to create temp irods data object." + + " file_pathname: " + file_pathname + + " tmp ipath: " + new_obj_ipath_tmp + " ex: " + str(axe) ) return None - # Need to do a get so we can apply metadata. + run_args = [ file_pathname, irods_sesh, new_obj_ipath_tmp, block_size ] + + # Cannot use asyncio with < python 3.7. Cannot use goto-statement with > python 3.6 + # run_results = asyncio.run( streaming_transfer_file_to_object( run_args ) ) + + run_results = streaming_transfer_file_to_object( run_args ) + + transfer_rc = run_results[ 0 ] + hash_digest256 = run_results[ 1 ] + hash_digest512 = run_results[ 2 ] + transfer_size = run_results[ 3 ] + + if transfer_rc < 0: + logging.error( "Falied transfer file to irods oobject." + + " file_pathname: " + file_pathname + + " tmp ipath: " + new_obj_ipath_tmp ) + return None + + b64_digest256 = base64.b64encode(hash_digest256).decode() + new_obj_hash_256 = "sha2:{}".format(b64_digest256) + + b64_digest512 = base64.b64encode(hash_digest512).decode() + new_obj_hash_512 = "sha2:{}".format(b64_digest512) + + if metadata_dict == None: + metadata_dict = {} + + # Put the hash and datatype into the dict + metadata_dict[ "SHA512SUMB64" ] = new_obj_hash_512 + + if datatype == None: + datatype = "UNKNOWN" + + metadata_dict[ "DataType" ] = datatype + try: - options = {kw.VERIFY_CHKSUM_KW: ''} - new_obj = irods_sesh.data_objects.get( new_obj_ipath_tmp, **options ) + pydict_to_irods_avus( tmp_obj, metadata_dict ) except Exception as ex: - logging.error("Failed to get path: " + new_obj_ipath_tmp + " ex: " + str(ex) ) + logging.error("Failed to create AVUS on iRODS object. ipath: " + new_obj_ipath + " ex: " + str(ex) ) + logging.error("metadata AVU dict >" + str( metadata_dict ) + "<" ) return None + existing_obj = None + try: - pydict_to_irods_avus( new_obj, metadata_dict ) + existing_obj = irods_sesh.data_objects.get( new_obj_ipath ) + logging.warn( "Object aleady exists at ipath: " + new_obj_ipath ) + except irods_ex.DataObjectDoesNotExist: + logging.info( "DataObjectDoesNotExist " + new_obj_ipath ) + except irods_ex.OBJ_PATH_DOES_NOT_EXIST: + logging.info( "OBJ_PATH_DOES_NOT_EXIST " + new_obj_ipath ) except Exception as ex: - logging.error("Failed to create AVUS. ipath: " + new_obj_ipath_tmp + " file: " + file + " ex: " + str(ex) ) - logging.error("metadata AVU dict >" + str( metadata_dict ) + "<" ) + logging.error("Failed using data_onject.get() (but not DataObjectDoesNotExist) ipath: " + new_obj_ipath + " ex: " + str(ex) + " type " + str(type(ex)) ) return None - # Move the tmp object to the proper object ipath . + mismatch = False + + if existing_obj != None: + + logging.warn( "Object exists and should not at ipath: " + new_obj_ipath ) + + # https://github.com/irods/python-irodsclient/blob/main/irods/test/collection_test.py#L317 + # file_digest = helpers.compute_sha256_digest( file_pathname ) + + # logging.error( "irods checksum: " + str( existing_obj.checksum ) + " new_obj_hash_256 " + str( new_obj_hash_256 ) + " file_digest " + str( file_digest ) ) + + if existing_obj.checksum != new_obj_hash_256 : + mismatch = True + logging.error( "Object prexists but data mismatches." + + " file_pathname " + file_pathname ) + else: + # Get the existing objects metadata as a python dict + existing_obj_dict = irods_avus_to_pydict( existing_obj ) + + if existing_obj_dict != metadata_dict: + logging.error( "Object matches on data but mismatches metadata." + + " new avu dict: " + str( metadata_dict ) + + " existing dict: " + str( existing_obj_dict ) + + " file_pathname: " + file_pathname ) + mismatch = True + + if mismatch: + # Mismatch existing object. Move the old object out of the way and date it. + dup_ipath = existing_obj.path + "_dup_" + datetime.datetime.now().strftime("%Y-%m-%d_%H:%M:%S.%fZ%z") + try: + irods_sesh.data_objects.move( existing_obj.path, dup_ipath ) + except Exception as ex: + logging.error( "Failed to move an existing object out of the way so can not upload current version." + + " path " + existing_obj.path + + " ex: " + str( ex ) ) + return None + logging.info( "Moved mismatching existing object to ipath: " + dup_ipath ) + # Fall through to move newly uploaded object into place. + else: + # Matching existing object. Remove the tmp object we just uploaded and leave the old one. + try: + tmp_obj.unlink(force=True) + except Exception as ex: + logging.error( "Failed to inlink newly created, but duplicate object." + + " new_obj_ipath_tmp: " + str( new_obj_ipath_tmp ) + + " ex: " + str( ex ) ) + + logging.warn( "Existing object matched data and metadata at ipath: " + + " ipath: " + new_obj_ipath ) + return existing_obj + + # Move the tmp object to the proper object ipath. try: - irods_sesh.data_objects.move( new_obj.path, new_obj_ipath ) + irods_sesh.data_objects.move( new_obj_ipath_tmp, new_obj_ipath ) except Exception as ex: logging.error( "Failed to move the tmp object to the proper logical path." + " tmp path: " + new_obj.path @@ -426,18 +492,22 @@ def confirmed_put( irods_sesh, file, new_obj_ipath, metadata_dict=None, datatype + " ex: " + str( ex ) ) return None - logging.debug( "file " + file + " in place at ipath " + new_obj_ipath ) + logging.debug( "file " + file_pathname + " in place at ipath " + new_obj_ipath ) - # Get the obj ref to return at the new ipath + new_obj = None try: - options = {kw.VERIFY_CHKSUM_KW: ''} - new_obj = irods_sesh.data_objects.get( new_obj_ipath, **options ) + new_obj = irods_sesh.data_objects.get( new_obj_ipath ) except Exception as ex: - logging.error("Failed to get path: " + new_obj_ipath + " ex: " + str(ex) ) + logging.error("Failed data_onject.open(). ipath: " + new_obj_ipath + " ex: " + str(ex) ) return None + logging.debug( "Returning new object handle for: " + + " file_pathname: " + file_pathname + + " new_obj.ipath: " + new_obj.path ) + return new_obj + # Bring in the experiment validation code sys.path.append('/home/mradmin/mrdata-common-public/pymods/mrdata_irods_config') import mrdata_irods_config as miconf