CSV

CSV file data source

Properties

Properties supported in this source are shown below ( * indicates required fields )
Property
Description
Name *
Name of the data source
Description
Description of the data source
Processing Mode
Select for batch and un-select for streaming. If 'Batch' is selected the value of the switch is set to true. If 'Streaming' is selected the value of the switch is set to false.Default: true
Infer Schema
Infers the input schema automatically from data. Requires one extra pass over the data.Default: false
Path *
Path to where the file or folder is locatedExample: file://files/load.csv,hdfs://datamorph/path,s3a://input_bucket/path
Header
Uses the first line as header row for names of columnsDefault: false
Schema
Source schema to assist during the design of the pipeline
Delimiter
Sets a separator for each field and value. This separator can be one or more charactersDefault: ,
Filename Column
Adds the absolute path of the file being read as a new column with the provided nameExample: file_name
Select Fields / Columns
Comma separated list of fields / columns to select from sourceExample: firstName, lastName, address1, address2, city, zipcodeDefault: *
Filter Expression
SQL where clause for filtering records. This is also used to load partitions from the sourceExample: date=2022-01-01,year = 22 and month = 6 and day = 2
Distinct Values
Select rows with distinct column valuesDefault: false
Path Glob Filter
Optional glob pattern to only include files with paths matching the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter. It does not change the behavior of partition discovery.
Recursive File Lookup
Recursively load files and it disables partition inferring. If your folder structure is partitioned with columnName=value (Eg. processDate=2022-01026), then using the recursive option WILL NOT read the partitions correctly.Default: false
Charset
Character set of the fileDefault: UTF-8
Quote
Sets a single character for escaping quoted values where the separator can be part of the value. If you would like to turn off quotations, you should not set to 'null' but an empty string. For writing, if an empty string is set, it uses u000 (null character).Example: "a","b" is parsed as a,bDefault: "
Escape
Sets a single character used for escaping quotes inside an already quoted valueExample: Escape value " \" a , b \" " is parsed as " a , b "Default: |
Comment
Sets a single character used for skipping lines beginning with this character. By default, it is disabled.
Date Format
Specify the date format that represents the source data datetime type fields. Custom date formats follow the formats from Datetime Patterns Help Section. This applies to timestamp type fields only when inferSchema is checked or a schema is provided to read the CSV file.Default: yyyy-MM-dd
Timestamp Format
Specify the timestamp format that represents the source data timestamp type field values. Custom timestamp formats follow the formats from Datetime Patterns Help Section. This applies to timestamp type fields only when inferSchema is checked or a schema is provided to read the CSV file.Default: yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]
Ignore Corrupt Files
If selected, jobs will continue to run when encountering corrupted files and the contents that have been read will still be returned
Ignore Missing Files
Select to ignore missing files while reading data from files
Modified Before
An optional timestamp to only include files with modification times occurring before the specified Time. The provided timestamp must be in the following form: YYYY-MM-DDTHH:mm:ssExample: 2020-06-01T13:00:00
Modified After
An optional timestamp to only include files with modification times occurring after the specified Time. The provided timestamp must be in the following form: YYYY-MM-DDTHH:mm:ssExample: 2020-06-01T13:00:00
Parser Library
Default is set to commons but can be set to univocityDefault: commons
Mode
Allows a mode for dealing with corrupt records during parsing. It supports the following case-insensitive modes and tries to parse only required columns in CSV under column pruning. Therefore, corrupt records can be different based on required set of fields. This behavior can be controlled by spark.sql.csv.parser.columnPruning.enabled (enabled by default).PERMISSIVE: When it meets a corrupted record, puts the malformed string into a field configured by Column Name Of CorruptRecord, and sets malformed fields to null. To keep corrupt records, an user can set a string type field named Column Name Of Corrupt Record in an user-defined schema. If a schema does not have the field, it drops corrupt records during parsing. A record with less/more tokens than schema is not a corrupted record to CSV. When it meets a record having fewer tokens than the length of the schema, sets null to extra fields. When the record has more tokens than the length of the schema, it drops extra tokens.DROPMALFORMED: Ignores the whole corrupted records.FAILFAST: Throws an exception when it meets corrupted records.Default: PERMISSIVE
Column Name Of Corrupt Record
Allows renaming the new field having malformed string created by PERMISSIVE mode
Null Value
Sets the string representation of a null value. The value specified in this field, if encountered in the source data, will be turned into null .For example: If the source data is as follows:customerId,city,state1, n/a, LA2, Livingston, MIIf we specify n/a for nullValue in the CSV source, then DataMorph/Spark will read the data and replace all the n/a encountered in the data to null as follows:customerId, city, state1, null, LA2, Livingston, MI
Empty Value
Sets the string representation of a empty value. The value specified in this field replaces empty string in the source data. Empty string is represented in the data as "". If a value for a field is not present in the data that is considered as null but not empty string.For example: If the source data is as follows:customerId,city,state1, ““, LA2, Livingston, MIIf we specify "empty" for emptyValue in the CSV source, then DataMorph/Spark will read the data and replace all the "" encountered in the data to empty as follows:customerId, city, state1, empty, LA2, Livingston, MI
NaN Value
Sets the string representation of a non-number value. The value specified in this field, if encountered in the source data for any column of Numerical type, will be read as NaN by Spark.Default: NaN
Encoding
Decodes the CSV files by the given encoding typeDefault: UTF-8
Character To Escape Quote Escaping
Sets a single character used for escaping the escape for the quote character. The default value is escape character when escape and quote characters are different, \0 otherwise.Example: The value “\ " a , b " \” is parsed as \ " a , b " \, if quote = ", quoteEscape = \ and charToEscapeQuoteEscaping = |
Enforce Schema
If selected, the specified or inferred schema will be forcibly applied to datasource files, and headers in CSV files will be ignored. If the option is not selected, the schema will be validated against all headers in CSV files in the case when the header option is selected. Field names in the schema and column names in CSV headers are checked by their positions taking into account spark.sql.caseSensitive. Though the default value is true, it is recommended to disable the enforceSchema option to avoid incorrect results.Default: true
Sampling Ratio
Defines fraction of rows used for schema inferenceDefault: 1
Ignore Leading WhiteSpace
Flag indicating whether or not leading whitespaces from values being read should be skippedDefault: false
Ignore Trailing WhiteSpace
Flag indicating whether or not trailing whitespaces from values being read should be skippedDefault: false
Positive Inf
Sets the string representation of a positive infinity value. The value specified in this field, if encountered in the source data for any column of Numerical type, will be read as Infinity by Spark.Default: Inf
Negative Inf
Sets the string representation of a negative infinity value. The value specified in this field, if encountered in the source data for any column of Numerical type, will be read as -Infinity by Spark.Default: -Inf
Maximum Columns
Defines a hard limit of how many columns a record can haveDefault: 20,480
Maximum Characters Per Column
Defines the maximum number of characters allowed for any given value being read. By default, it is -1 meaning unlimited lengthDefault: -1
Unescaped Quote Handling
Defines how the CsvParser will handle values with unescaped quotes.STOP_AT_CLOSING_QUOTE: If unescaped quotes are found in the input, accumulate the quote character and proceed parsing the value as a quoted value, until a closing quote is found.BACK_TO_DELIMITER: If unescaped quotes are found in the input, consider the value as an unquoted value. This will make the parser accumulate all characters of the current parsed value until the delimiter is found. If no delimiter is found in the value, the parser will continue accumulating characters from the input until a delimiter or line ending is found.STOP_AT_DELIMITER: If unescaped quotes are found in the input, consider the value as an unquoted value. This will make the parser accumulate all characters until the delimiter or a line ending is found in the input.SKIP_VALUE: If unescaped quotes are found in the input, the content parsed for the given value will be skipped and the value set in nullValue will be produced instead.RAISE_ERROR: If unescaped quotes are found in the input, a TextParsingException will be thrownDefault: STOP_AT_DELIMITER
Multi-Line
Parse one record, which may span multiple lines, per fileDefault: false
Line Separator
Defines the line separator that should be used for parsing. Default is \n, \r, \r\n
Locale
Sets a locale as language tag in IETF BCP 47 format. For instance, this is used while parsing dates and timestampsDefault: en-US
Normalize Column Names
Normalizes column names by replacing special characters ,;{}()&/\n\t= and space with the given stringExample: _
Watermark Field Name
Field name to be used as watermark. If unspecified in streaming mode, the default field name is 'tempWatermark'.Example: myConsumerWatermarkDefault: tempWatermark
Watermark Value
Watermark value settingExample: 10 seconds,2 minutes
Cache
MEMORY_ONLY: Persist data in memory only in deserialized formatMEMORY_AND_DISK: Persist data in memory and if enough memory is not available evicted blocks will be stored on diskMEMORY_ONLY_SER: Same as MEMORY_ONLY but difference being it persists in serialized format. This is generally more space-efficient than deserialized format, but more CPU-intensive to read.MEMORY_AND_DISK_SER: Same as MEMORY_AND_DISK storage level difference being it persists in serialized formatDISK_ONLY: Persist the data partitions only on diskMEMORY_ONLY_2, MEMORY_AND_DISK_2: Same as the levels above, but replicate each partition on two cluster nodesOFF_HEAP: Similar to MEMORY_ONLY_SER, but store the data in off-heap memory. This requires off-heap memory to be enabledDefault: NONE