Skip to content
Snippets Groups Projects
Commit 66abd535 authored by Blake Fitch's avatar Blake Fitch
Browse files

change to a streaming transfer of data to from landing zone to irods assuming...

change to a streaming transfer of data to from landing zone to irods assuming no dups, but handling them if they happen. hope to avoid blowing memory cache in the uploader
parent 6ff0a472
No related branches found
No related tags found
No related merge requests found
...@@ -236,189 +236,255 @@ def pydict_to_irods_avus( coll_or_obj, metadata_dict ): ...@@ -236,189 +236,255 @@ def pydict_to_irods_avus( coll_or_obj, metadata_dict ):
return True return True
# Returns None or the data_object irods ref. import irods
def confirmed_put( irods_sesh, file, new_obj_ipath, metadata_dict=None, datatype=None, block_size=(2**24) ): import os
logging.debug( "file: " + file import asyncio
+ " new_obj_ipath: " + new_obj_ipath
+ " datatype: " + datatype
+ " block_size: " + str( block_size ) )
if irods_sesh == None: # In line async fuction to read data blocks
logging.error( "irods_sesh == None" ) async def async_read_block(file_handle, block_size):
return None while True:
# Read a block of data from the file asynchronously in a separate thread
block = await asyncio.to_thread(file_handle.read, block_size)
if not block:
# End of file reached, break out of loop
break
# Return the block of data to the event loop as a coroutine
yield block
# We want a hash of our file. Should put this off in a separate thread? async def async_streaming_transfer_file_to_object( args ):
try:
sha = hashlib.sha512() file_pathname = args[ 0 ]
with open(file, 'rb') as f: irods_obj = args[ 1 ]
for chunk in iter(lambda: f.read(block_size), b''): block_size = args[ 2 ]
sha.update(chunk)
rc = 0
transfer_size = 0
# Initialize the checksum calculator
hasher256 = hashlib.sha256()
hasher512 = hashlib.sha512()
new_obj_hash = base64.urlsafe_b64encode( sha.digest() ).rstrip(b'=').decode() try:
while True:
# Read a block of data from the file asynchronously
block = await read_block(file_pathname, block_size)
if not block:
# End of file reached, break out of loop
break
transfer_size += len(block)
# Write the block of data to the temporary object in iRODS
tmp_obj.write(block)
# Flush the buffer to ensure the block is written to iRODS immediately
tmp_obj.flush()
# Update the checksum with the block of data
hasher256.update(block)
hasher512.update(block)
except Exception as ex: except Exception as ex:
logging.error( "Failed to create hash for new object." logging.error("Failed to transfer data from file to irods. ipath: " + new_obj_ipath + " ex: " + str(ex) )
+ " file: " + file #return None
+ " ex: " + str(ex) ) rc = -1
return None
if metadata_dict == None: return rc, hasher256, hasher512, transfer_size
metadata_dict = {}
# Put the hash and datatype into the dict def streaming_transfer_file_to_object( args ):
metadata_dict[ "SHA512SUMB64" ] = new_obj_hash
if datatype == None: file_pathname = args[ 0 ]
datatype = "UNKNOWN" irods_sesh = args[ 1 ]
irods_ipath = args[ 2 ]
block_size = args[ 3 ]
metadata_dict[ "DataType" ] = datatype rc = 0
transfer_size = 0
# Do a get to make sure this object does not exsist. (if it does, we will check the checksum, etc) # Initialize the checksum calculator
new_obj_exists_flag = False hasher256 = hashlib.sha256()
try: hasher512 = hashlib.sha512()
options = {kw.VERIFY_CHKSUM_KW: ''}
new_obj = irods_sesh.data_objects.get( new_obj_ipath, **options )
new_obj_exists_flag = True
except Exception as ex:
logging.debug( "Failed to get new object. (OK because we expect to create it.)"
+ " ipath: " + new_obj_ipath
+ " ex: " + str(ex) )
# X If the object exists, check that it matches what we would upload.
# X This should not happen under normal conditions.
# X If this block can not match file and metadata, we go to the error path.
# Move object to a new name with a time stamp
if new_obj_exists_flag: target_obj = irods_sesh.data_objects.get(irods_ipath)
logging.error( "new object already exists (NOT OK because we expected to create it.)" try:
+ " file " + file # irods_obj_handle = irods_sesh.data_objects.open( irods_ipath, mode='w', checksum=True)
+ " ipath: " + new_obj_ipath ) irods_obj_handle = target_obj.open('w', checksum=True)
except Exception as ex:
logging.error("Failed open irods object for transfer. file: " + file_pathname + " to irods ipath: " + irods_ipath + " ex: " + str(ex) )
return -1, hasher256, hasher512, 0
# https://github.com/irods/python-irodsclient/blob/main/irods/test/collection_test.py#L317 try:
digest = helpers.compute_sha256_digest( file ) with open( file_pathname, 'rb' ) as input_file:
while True:
# Read a block of data from the file asynchronously
block = input_file.read( block_size )
if not block:
# End of file reached, break out of loop
break
transfer_size += len(block)
# Write the block of data to the object in iRODS
irods_obj_handle.write(block)
# Flush the buffer to ensure the block is written to iRODS immediately
irods_obj_handle.flush()
# Update the checksum with the block of data
hasher256.update(block)
hasher512.update(block)
except Exception as ex:
logging.error("Failed to transfer data from file: " + file_pathname + " to irods ipath: " + irods_obj.path + " ex: " + str(ex) )
#return None
rc = -1
if new_obj.checksum != "sha2:{}".format(digest) : irods_obj_handle.close()
logging.error( "sha256 mismatch on local object vs object found in iRODS."
+ " file " + file
+ " new_obj_ipath " + new_obj_ipath )
# This means we are really trying to upload a different data object to the same archive ipath hash_digest256 = hasher256.digest()
# Move the old object out of the way and date it. hash_digest512 = hasher512.digest()
try:
irods_sesh.data_objects.move( new_obj.path,
new_obj.path + "_" + datetime.datetime.now().strftime("%Y-%m-%d_%H:%M:%S.%fZ%z") )
except Exception as ex:
logging.error( "Failed to move an existing object out of the way so can not upload current version."
+ " path " + new_obj.path
+ " ex: " + str( ex ) )
return None
# From here, we will try to upload a new object to replace the one moved aside b64_digest = base64.b64encode(hash_digest256).decode()
new_obj_hash_256 = "sha2:{}".format(b64_digest)
logging.error( "new_obj_hash_256: " + new_obj_hash_256 )
else: # The object exists and data sha matches, now check AVUs return rc, hash_digest256, hash_digest512, transfer_size
# Get the existing objects metadata as a python dict # Returns None or the data_object irods ref.
existing_obj_dict = irods_avus_to_pydict( new_obj ) def confirmed_put( irods_sesh, file_pathname, new_obj_ipath, metadata_dict=None, datatype=None, block_size=(2**28) ):
logging.debug( "file_pathname: " + file_pathname
if "SHA512SUMB64" in existing_obj_dict: + " new_obj_ipath: " + new_obj_ipath
if metadata_dict[ "SHA512SUMB64" ] != existing_obj_dict[ "SHA512SUMB64" ] : + " datatype: " + datatype
logging.error( "WEIRD ERROR: iRODS sha226 sum matched but MrData sha512 AVU mismatched." + " block_size: " + str( block_size ) )
+ ' metadata_dict[ "SHA512SUMB64" ] ' + metadata_dict[ "SHA512SUMB64" ]
+ ' existing_obj_dict[ "SHA512SUMB64" ] ' + existing_obj_dict[ "SHA512SUMB64" ] )
else:
logging.error( "WEIRD ERROR: iRODS sha226 sum but object has no MrData SHA512SUMB64 AVU."
+ ' metadata_dict[ "SHA512SUMB64" ] ' + metadata_dict[ "SHA512SUMB64" ] )
if existing_obj_dict == None:
logging.error( "Object exists but fetching avus as py dict failed."
+ " ipath: " + new_obj_ipath )
# Ensure metadata matches. When it does, call it successful dupe and move on.
# Note: object must have all candidate AVUs but it may have additional AVUs.
mismatch = False
for key, value in metadata_dict.items():
if key not in existing_obj_dict or existing_obj_dict[ key ] != value:
if key in existing_obj_dict:
dup_key_value = str( existing_obj_dict[ key ] ) + ". "
metadata_dict[ key ] = dup_key_value # add the existing key/value so to end up with the union of AVUs
else:
dup_key_value = "not defined in existing_obj_dict. "
logging.info( "Duplicated object metadata field does not match."
+ " existing_obj_dict key: " + str( key ) + " value: " + dup_key_value
+ " mismatch value: " + str( value )
+ " file: " + file )
mismatch = True
if mismatch:
logging.info( "Metadata mismatches. Will add missing AVUs."
+ " file: " + file )
try:
pydict_to_irods_avus( new_obj, metadata_dict )
except Exception as ex:
logging.error("Failed to create AVUS. ipath: " + new_obj_ipath + " ex: " + str(ex) )
logging.error("metadata AVU dict >" + str( metadata_dict ) + "<" )
return None
return new_obj
logging.warn( "Candidate file matches existing data object."
+ " ipath: " + new_obj_ipath )
return new_obj # will not upload since the object and metadata should now match.
# !! This code had a problem with a network error leaving table row for object # NOTE: this routine uploades the file as a temporary iRODS object.
# !! with data_is_dirty set wrong. Table r_data_main # Common sense suggests checking if the objct already exsists, a rare condition first.
## Expect to be able to put now. # The trouble is, that requires checksumming the new object before uploading and
## The file is new and will be uploaded here along with metadata. # for very large objects (many GB) this checksum results in pulling the file from
#new_obj_ipath_tmp = new_obj_ipath + "_tmp" # a network file system twice due to blowing cache.
# # So, we upload to temp, checkusmming along the way, and then check for a pre-existing irods object.
## Check if the temp obj exists
#try: if irods_sesh == None:
# options = {} # shouldn't care about the checksum {kw.VERIFY_CHKSUM_KW: ''} logging.error( "irods_sesh == None" )
# new_obj = irods_sesh.data_objects.get( new_obj_ipath_tmp, **options ) return None
# # When there has not been an exception, delete the old tmp object
# irods_sesh.data_objects.unlink( new_obj_ipath_tmp, force = True )
# logging.error( "new_obj tmp file existed and has been deleted. This means we've tried to upload here before and failed."
# + " new_obj_ipath_tmp " + new_obj_ipath_tmp )
#except Exception as ex:
# logging.debug( "Failed to get new_obj_ipath_tmp. (OK because we expect to create it.)"
# + " tmp ipath: " + new_obj_ipath_tmp
# + " ex: " + str(ex) )
#
# Make a timestamped partial tmp file name which, if the upload somehow fails here, will be left behind # Make a timestamped partial tmp file name which, if the upload somehow fails here, will be left behind
# Hopefully this does not happen much, but we have seen at least on case so far. # Hopefully this does not happen much, but we have seen at least on case so far.
# Cleanup will need to be done elsewhere. # Cleanup will need to be done elsewhere.
new_obj_ipath_tmp = new_obj_ipath + ".part." + datetime.datetime.now().strftime("%Y-%m-%d_%H:%M:%S.%fZ%z") new_obj_ipath_tmp = new_obj_ipath + ".part." + datetime.datetime.now().strftime("%Y-%m-%d_%H:%M:%S.%fZ%z")
logging.debug("Uploading file: " + file + " to tmp ipath: " + new_obj_ipath_tmp ) logging.debug("Uploading file: " + file_pathname + " to tmp ipath: " + new_obj_ipath_tmp )
try: try:
options = {kw.REG_CHKSUM_KW: '', kw.ALL_KW: ''} tmp_obj = irods_sesh.data_objects.create( new_obj_ipath_tmp )
irods_sesh.data_objects.put( file, new_obj_ipath_tmp, **options )
except Exception as axe: except Exception as axe:
logging.error( "Falied to put data object." logging.error( "Falied to create temp irods data object."
+ " file: " + file + " file_pathname: " + file_pathname
+ " tmp ipath: " + new_obj_ipath_tmp + " tmp ipath: " + new_obj_ipath_tmp
+ " ex: " + str(axe) ) + " ex: " + str(axe) )
return None return None
# Need to do a get so we can apply metadata. run_args = [ file_pathname, irods_sesh, new_obj_ipath_tmp, block_size ]
# Cannot use asyncio with < python 3.7. Cannot use goto-statement with > python 3.6
# run_results = asyncio.run( streaming_transfer_file_to_object( run_args ) )
run_results = streaming_transfer_file_to_object( run_args )
transfer_rc = run_results[ 0 ]
hash_digest256 = run_results[ 1 ]
hash_digest512 = run_results[ 2 ]
transfer_size = run_results[ 3 ]
if transfer_rc < 0:
logging.error( "Falied transfer file to irods oobject."
+ " file_pathname: " + file_pathname
+ " tmp ipath: " + new_obj_ipath_tmp )
return None
b64_digest256 = base64.b64encode(hash_digest256).decode()
new_obj_hash_256 = "sha2:{}".format(b64_digest256)
b64_digest512 = base64.b64encode(hash_digest512).decode()
new_obj_hash_512 = "sha2:{}".format(b64_digest512)
if metadata_dict == None:
metadata_dict = {}
# Put the hash and datatype into the dict
metadata_dict[ "SHA512SUMB64" ] = new_obj_hash_512
if datatype == None:
datatype = "UNKNOWN"
metadata_dict[ "DataType" ] = datatype
try: try:
options = {kw.VERIFY_CHKSUM_KW: ''} pydict_to_irods_avus( tmp_obj, metadata_dict )
new_obj = irods_sesh.data_objects.get( new_obj_ipath_tmp, **options )
except Exception as ex: except Exception as ex:
logging.error("Failed to get path: " + new_obj_ipath_tmp + " ex: " + str(ex) ) logging.error("Failed to create AVUS on iRODS object. ipath: " + new_obj_ipath + " ex: " + str(ex) )
logging.error("metadata AVU dict >" + str( metadata_dict ) + "<" )
return None return None
existing_obj = None
try: try:
pydict_to_irods_avus( new_obj, metadata_dict ) existing_obj = irods_sesh.data_objects.get( new_obj_ipath )
logging.warn( "Object aleady exists at ipath: " + new_obj_ipath )
except irods_ex.DataObjectDoesNotExist:
logging.info( "DataObjectDoesNotExist " + new_obj_ipath )
except irods_ex.OBJ_PATH_DOES_NOT_EXIST:
logging.info( "OBJ_PATH_DOES_NOT_EXIST " + new_obj_ipath )
except Exception as ex: except Exception as ex:
logging.error("Failed to create AVUS. ipath: " + new_obj_ipath_tmp + " file: " + file + " ex: " + str(ex) ) logging.error("Failed using data_onject.get() (but not DataObjectDoesNotExist) ipath: " + new_obj_ipath + " ex: " + str(ex) + " type " + str(type(ex)) )
logging.error("metadata AVU dict >" + str( metadata_dict ) + "<" )
return None return None
# Move the tmp object to the proper object ipath . mismatch = False
if existing_obj != None:
logging.warn( "Object exists and should not at ipath: " + new_obj_ipath )
# https://github.com/irods/python-irodsclient/blob/main/irods/test/collection_test.py#L317
# file_digest = helpers.compute_sha256_digest( file_pathname )
# logging.error( "irods checksum: " + str( existing_obj.checksum ) + " new_obj_hash_256 " + str( new_obj_hash_256 ) + " file_digest " + str( file_digest ) )
if existing_obj.checksum != new_obj_hash_256 :
mismatch = True
logging.error( "Object prexists but data mismatches."
+ " file_pathname " + file_pathname )
else:
# Get the existing objects metadata as a python dict
existing_obj_dict = irods_avus_to_pydict( existing_obj )
if existing_obj_dict != metadata_dict:
logging.error( "Object matches on data but mismatches metadata."
+ " new avu dict: " + str( metadata_dict )
+ " existing dict: " + str( existing_obj_dict )
+ " file_pathname: " + file_pathname )
mismatch = True
if mismatch:
# Mismatch existing object. Move the old object out of the way and date it.
dup_ipath = existing_obj.path + "_dup_" + datetime.datetime.now().strftime("%Y-%m-%d_%H:%M:%S.%fZ%z")
try:
irods_sesh.data_objects.move( existing_obj.path, dup_ipath )
except Exception as ex:
logging.error( "Failed to move an existing object out of the way so can not upload current version."
+ " path " + existing_obj.path
+ " ex: " + str( ex ) )
return None
logging.info( "Moved mismatching existing object to ipath: " + dup_ipath )
# Fall through to move newly uploaded object into place.
else:
# Matching existing object. Remove the tmp object we just uploaded and leave the old one.
try:
tmp_obj.unlink(force=True)
except Exception as ex:
logging.error( "Failed to inlink newly created, but duplicate object."
+ " new_obj_ipath_tmp: " + str( new_obj_ipath_tmp )
+ " ex: " + str( ex ) )
logging.warn( "Existing object matched data and metadata at ipath: "
+ " ipath: " + new_obj_ipath )
return existing_obj
# Move the tmp object to the proper object ipath.
try: try:
irods_sesh.data_objects.move( new_obj.path, new_obj_ipath ) irods_sesh.data_objects.move( new_obj_ipath_tmp, new_obj_ipath )
except Exception as ex: except Exception as ex:
logging.error( "Failed to move the tmp object to the proper logical path." logging.error( "Failed to move the tmp object to the proper logical path."
+ " tmp path: " + new_obj.path + " tmp path: " + new_obj.path
...@@ -426,18 +492,22 @@ def confirmed_put( irods_sesh, file, new_obj_ipath, metadata_dict=None, datatype ...@@ -426,18 +492,22 @@ def confirmed_put( irods_sesh, file, new_obj_ipath, metadata_dict=None, datatype
+ " ex: " + str( ex ) ) + " ex: " + str( ex ) )
return None return None
logging.debug( "file " + file + " in place at ipath " + new_obj_ipath ) logging.debug( "file " + file_pathname + " in place at ipath " + new_obj_ipath )
# Get the obj ref to return at the new ipath new_obj = None
try: try:
options = {kw.VERIFY_CHKSUM_KW: ''} new_obj = irods_sesh.data_objects.get( new_obj_ipath )
new_obj = irods_sesh.data_objects.get( new_obj_ipath, **options )
except Exception as ex: except Exception as ex:
logging.error("Failed to get path: " + new_obj_ipath + " ex: " + str(ex) ) logging.error("Failed data_onject.open(). ipath: " + new_obj_ipath + " ex: " + str(ex) )
return None return None
logging.debug( "Returning new object handle for: "
+ " file_pathname: " + file_pathname
+ " new_obj.ipath: " + new_obj.path )
return new_obj return new_obj
# Bring in the experiment validation code # Bring in the experiment validation code
sys.path.append('/home/mradmin/mrdata-common-public/pymods/mrdata_irods_config') sys.path.append('/home/mradmin/mrdata-common-public/pymods/mrdata_irods_config')
import mrdata_irods_config as miconf import mrdata_irods_config as miconf
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment