Skip to main content
Cluster requirements:
  • UC dedicated clusters not supported
  • UC standard clusters not supported
  • Livy clusters not supported
Helps perform file operations like copy and move on different file systems.

Parameters

ParameterDescriptionRequired
File SystemLocal - for operations on driver node file system DBFS - for operations on Databricks file system S3 - for operations on S3 object storeTrue
OperationOperation to perform, Copy, Move or SyncTrue
Filename RegexRegex to Filter File Names Eg: stdlog.*.txtFalse
Ignore empty filesIgnore if file size is empty (Size of file is 0 bytes)False
RecurseBoolean for performing Operation recursively. Default is FalseFalse
Source PathPath of source file/directory. Eg: /dbfs/source_file.txt, dbfs:/source_file.txt, s3://source_bucket/source_prefix/filename.txtTrue
Destination PathPath of destination file/directory. Eg: /dbfs/target_file.txt, dbfs:/target_file.txt, s3://target_bucket/target_prefix/filename.txtTrue
You can perform operations on DBFS files using Local file system too by providing path under /dbfs! This is because Databricks uses a FUSE mount to provide local access to the files stored in the cloud. A FUSE mount is a secure, virtual filesystem.

Examples


Copy Single File

def copy_file(spark: SparkSession):
 from pyspark.dbutils import DBUtils
 DBUtils(spark).fs.cp(
 "dbfs:/Prophecy/example/source/person.json",
 "dbfs:/Prophecy/example/target/person.json",
 recurse = False
 )

Copy All Files From A Directory

def copy_file(spark: SparkSession):
 from pyspark.dbutils import DBUtils
 DBUtils(spark).fs.cp(
 "dbfs:/Prophecy/example/source/",
 "dbfs:/Prophecy/example/target/",
 recurse = True
 )

Move Files

def move_file(spark: SparkSession):
 from pyspark.dbutils import DBUtils
 DBUtils(spark).fs.mv("dbfs:/Prophecy/example/source/", "dbfs:/Prophecy/example/target/", recurse = False)

S3 - Sync Entire Directory

def sync_file(spark: SparkSession):
 dest_files = set(
 [
 f_object['Key'].lstrip('/')
 for f_object in boto3.client("s3").list_objects_v2(Bucket = dest_bucket, Prefix = dest_url.path.lstrip('/'))['Contents']
 if not f_object['Key'].endswith("/")
 ]
 )

 for obj in boto3.client("s3").list_objects_v2(Bucket = src_bucket, Prefix = src_url.path.lstrip('/'))['Contents']:
 new_dest_prefix = re.sub(src_prefix, dest_prefix, obj['Key'], 1)

 if (
 (
 mode in ["copy", "move"]
 and not obj['Key'].endswith("/")
 )
 or (
 not obj['Key'].endswith("/")
 and mode == "sync"
 and re.sub(src_prefix, dest_prefix, obj['Key'], 1) not in dest_files
 )
 ):

 if (
 (
 bool(ignoreEmptyFiles) == True
 and (
 s3.head_object(Bucket=src_bucket, Key=obj['Key'])['ContentLength']
 == 0
 )
 )
 or (
 bool(fileRegex)
 and fileRegex != ""
 and not bool(re.compile(fileRegex).match(obj['Key'].split('/')[- 1]))
 )
 ):
 continue

 s3.copy(
 {'Bucket' : src_bucket, 'Key' : obj['Key']},
 dest_bucket,
 re.sub(src_prefix, dest_prefix, obj['Key'], 1)
 )

 if mode == "move":
 s3.delete_object(Bucket = src_bucket, Key = obj['Key'])