跳转至

Preprocesssor Module

pipelines.pipelines.nodes.preprocessor.preprocessor

PreProcessor

Source code in pipelines/pipelines/nodes/preprocessor/preprocessor.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
class PreProcessor(BasePreProcessor):
    def __init__(
        self,
        clean_whitespace: bool = True,
        clean_header_footer: bool = False,
        clean_empty_lines: bool = True,
        split_by: str = "word",
        split_length: int = 200,
        split_overlap: int = 0,
        split_answers: bool = False,
        split_respect_sentence_boundary: bool = True,
        language: str = "en",
    ):
        """
        :param clean_header_footer: Use heuristic to remove footers and headers across different pages by searching
                                     for the longest common string. This heuristic uses exact matches and therefore
                                     works well for footers like "Copyright 2019 by XXX", but won't detect "Page 3 of 4"
                                     or similar.
        :param clean_whitespace: Strip whitespaces before or after each line in the text.
        :param clean_empty_lines: Remove more than two empty lines in the text.
        :param split_by: Unit for splitting the document. Can be "word", "sentence", or "passage". Set to None to disable splitting.
        :param split_length: Max. number of the above split unit (e.g. words) that are allowed in one document. For instance, if n -> 10 & split_by ->
                           "sentence", then each output document will have 10 sentences.
        :param split_overlap: Word overlap between two adjacent documents after a split.
                              Setting this to a positive number essentially enables the sliding window approach.
                              For example, if split_by -> `word`,
                              split_length -> 5 & split_overlap -> 2, then the splits would be like:
                              [w1 w2 w3 w4 w5, w4 w5 w6 w7 w8, w7 w8 w10 w11 w12].
                              Set the value to 0 to ensure there is no overlap among the documents after splitting.
        :param split_respect_sentence_boundary: Whether to split in partial sentences if split_by -> `word`. If set
                                                to True, the individual split will always have complete sentences &
                                                the number of words will be <= split_length.
        :param language: The language used by "nltk.tokenize.sent_tokenize" in iso639 format. Available options: "en", "es", "de", "fr" & many more.
        """

        # save init parameters to enable export of component config as YAML
        self.set_config(
            clean_whitespace=clean_whitespace,
            clean_header_footer=clean_header_footer,
            clean_empty_lines=clean_empty_lines,
            split_by=split_by,
            split_length=split_length,
            split_overlap=split_overlap,
            split_answers=split_answers,
            split_respect_sentence_boundary=split_respect_sentence_boundary,
        )

        try:
            if nltk:
                nltk.data.find("tokenizers/punkt")
        except LookupError:
            try:
                if nltk:
                    nltk.download("punkt")
            except FileExistsError as error:
                logger.debug("NLTK punkt tokenizer seems to be already downloaded. Error message: %s", error)
                pass

        self.clean_whitespace = clean_whitespace
        self.clean_header_footer = clean_header_footer
        self.clean_empty_lines = clean_empty_lines
        self.split_by = split_by
        self.split_length = split_length
        self.split_overlap = split_overlap
        self.split_respect_sentence_boundary = split_respect_sentence_boundary
        self.language = language
        self.print_log: Set[str] = set()
        self.split_answers = split_answers

    def process(
        self,
        documents: Union[dict, List[dict]],
        clean_whitespace: Optional[bool] = None,
        clean_header_footer: Optional[bool] = None,
        clean_empty_lines: Optional[bool] = None,
        split_by: Optional[str] = None,
        split_length: Optional[int] = None,
        split_overlap: Optional[int] = None,
        split_respect_sentence_boundary: Optional[bool] = None,
    ) -> List[dict]:
        """
        Perform document cleaning and splitting. Can take a single document or a list of documents as input and returns a list of documents.
        """

        kwargs = {
            "clean_whitespace": clean_whitespace,
            "clean_header_footer": clean_header_footer,
            "clean_empty_lines": clean_empty_lines,
            "split_by": split_by,
            "split_length": split_length,
            "split_overlap": split_overlap,
            "split_respect_sentence_boundary": split_respect_sentence_boundary,
        }

        ret = []

        if type(documents) == dict:
            ret = self._process_single(document=documents, **kwargs)  # type: ignore
        elif type(documents) == list:
            ret = self._process_batch(documents=list(documents), **kwargs)

        else:
            raise Exception("documents provided to PreProcessor.prepreprocess() is not of type list nor Document")

        return ret

    def _process_single(
        self,
        document,
        clean_whitespace: Optional[bool] = None,
        clean_header_footer: Optional[bool] = None,
        clean_empty_lines: Optional[bool] = None,
        split_by: Optional[str] = None,
        split_length: Optional[int] = None,
        split_overlap: Optional[int] = None,
        split_answers: Optional[int] = None,
        split_respect_sentence_boundary: Optional[bool] = None,
    ) -> List[dict]:

        if clean_whitespace is None:
            clean_whitespace = self.clean_whitespace
        if clean_header_footer is None:
            clean_header_footer = self.clean_header_footer
        if clean_empty_lines is None:
            clean_empty_lines = self.clean_empty_lines
        if split_by is None:
            split_by = self.split_by
        if split_length is None:
            split_length = self.split_length
        if split_overlap is None:
            split_overlap = self.split_overlap
        if split_respect_sentence_boundary is None:
            split_respect_sentence_boundary = self.split_respect_sentence_boundary
        if split_answers is None:
            split_answers = self.split_answers

        cleaned_document = self.clean(
            document=document,
            clean_whitespace=clean_whitespace,
            clean_header_footer=clean_header_footer,
            clean_empty_lines=clean_empty_lines,
        )
        split_documents = self.split(
            document=cleaned_document,
            split_by=split_by,
            split_length=split_length,
            split_overlap=split_overlap,
            split_answers=split_answers,
            split_respect_sentence_boundary=split_respect_sentence_boundary,
        )
        return split_documents

    def _process_batch(self, documents: List[dict], **kwargs) -> List[dict]:
        nested_docs = [self._process_single(d, **kwargs) for d in tqdm(documents, unit="docs")]
        return [d for x in nested_docs for d in x]

    def clean(
        self,
        document: dict,
        clean_whitespace: bool,
        clean_header_footer: bool,
        clean_empty_lines: bool,
    ) -> dict:
        """
        Perform document cleaning on a single document and return a single document. This method will deal with whitespaces, headers, footers
        and empty lines. Its exact functionality is defined by the parameters passed into PreProcessor.__init__().
        """
        text = document["content"]
        if clean_header_footer:
            text = self._find_and_remove_header_footer(
                text, n_chars=300, n_first_pages_to_ignore=1, n_last_pages_to_ignore=1
            )

        if clean_whitespace:
            lines = text.splitlines()

            cleaned_lines = []
            for line in lines:
                line = line.strip()
                cleaned_lines.append(line)
            text = "\n".join(cleaned_lines)

        if clean_empty_lines:
            text = re.sub(r"\n\n+", "\n\n", text)

        document["content"] = text
        return document

    def split(
        self,
        document: dict,
        split_by: str,
        split_length: int,
        split_overlap: int,
        split_answers: bool,
        split_respect_sentence_boundary: bool,
    ) -> List[dict]:
        """Perform document splitting on a single document. This method can split on different units, at different lengths,
        with different strides. It can also respect sentence boundaries. Its exact functionality is defined by
        the parameters passed into PreProcessor.__init__(). Takes a single document as input and returns a list of documents."""

        if not split_by:
            return [document]

        if not split_length:
            raise Exception("split_length needs be set when using split_by.")

        if split_respect_sentence_boundary and split_by != "word":
            raise NotImplementedError(
                "'split_respect_sentence_boundary=True' is only compatible with split_by='word'."
            )

        text = document["content"]

        if split_respect_sentence_boundary and split_by == "word":
            # split by words ensuring no sub sentence splits
            if self.language == "chinese":
                sentences = text
            else:
                language_name = iso639_to_nltk.get(self.language)
                sentences = nltk.tokenize.sent_tokenize(text, language=language_name)

            word_count = 0
            list_splits = []
            current_slice: List[str] = []
            for sen in sentences:
                if self.language == "chinese":
                    current_word_count = len(sen)
                else:
                    current_word_count = len(sen.split(" "))
                if current_word_count > split_length:
                    long_sentence_message = "One or more sentence found with word count higher than the split length."
                    if long_sentence_message not in self.print_log:
                        self.print_log.add(long_sentence_message)
                        logger.warning(long_sentence_message)
                if word_count + current_word_count > split_length:
                    list_splits.append(current_slice)
                    # Enable split_stride with split_by='word' while respecting sentence boundaries.
                    if split_overlap:
                        overlap = []
                        w_count = 0
                        for s in current_slice[::-1]:
                            if self.language == "chinese":
                                sen_len = len(s)
                            else:
                                sen_len = len(s.split(" "))
                            if w_count < split_overlap:
                                overlap.append(s)
                                w_count += sen_len
                            else:
                                break
                        current_slice = list(reversed(overlap))
                        word_count = w_count
                    else:
                        current_slice = []
                        word_count = 0
                current_slice.append(sen)
                if self.language == "chinese":
                    word_count += len(sen)
                else:
                    word_count += len(sen.split(" "))
            if current_slice:
                list_splits.append(current_slice)

            text_splits = []
            for sl in list_splits:
                if self.language == "chinese":
                    txt = "".join(sl)
                else:
                    txt = " ".join(sl)
                if len(txt) > 0:
                    text_splits.append(txt)
        else:
            # create individual "elements" of passage, sentence, or word
            # Faq text need to split text by '\n' of a passage
            if split_answers and split_by == "passage":
                text_splits = text.split("\n")
            elif split_by == "passage":
                elements = text.split("\n\n")
            elif split_by == "sentence":
                language_name = iso639_to_nltk.get(self.language)
                elements = nltk.tokenize.sent_tokenize(text, language=language_name)
            elif split_by == "word":
                elements = text.split(" ")
            else:
                raise NotImplementedError(
                    "PreProcessor only supports 'passage', 'sentence' or 'word' split_by options."
                )

            # concatenate individual elements based on split_length & split_stride
            # FAQ text process don't need split text into fix lengths
            if not split_answers:
                segments = windowed(elements, n=split_length, step=split_length - split_overlap)

                text_splits = []
                for seg in segments:
                    txt = " ".join([t for t in seg if t is not None])
                    if len(txt) > 0:
                        text_splits.append(txt)
        # create new document dicts for each text split
        documents = []
        for i, txt in enumerate(text_splits):
            doc = deepcopy(document)
            doc["content"] = txt

            if "meta" not in doc.keys() or doc["meta"] is None:
                doc["meta"] = {}
            if split_answers:
                text_arr = doc["content"].split("\t")
                if len(text_arr) > 2:
                    raise Exception("Each line text must be two columns and separated by \t")
                # Maybe empty lines
                if len(text_arr) == 1:
                    logger.info("Some lines in your text cannot parse into question and text, maybe empty lines")
                    continue
                else:
                    query, answer = text_arr
                doc["content"] = query
                doc["meta"]["answer"] = answer
            doc["meta"]["_split_id"] = i
            documents.append(doc)

        return documents

    def _find_and_remove_header_footer(
        self, text: str, n_chars: int, n_first_pages_to_ignore: int, n_last_pages_to_ignore: int
    ) -> str:
        """
        Heuristic to find footers and headers across different pages by searching for the longest common string.
        For headers we only search in the first n_chars characters (for footer: last n_chars).
        Note: This heuristic uses exact matches and therefore works well for footers like "Copyright 2019 by XXX",
         but won't detect "Page 3 of 4" or similar.

        :param n_chars: number of first/last characters where the header/footer shall be searched in
        :param n_first_pages_to_ignore: number of first pages to ignore (e.g. TOCs often don't contain footer/header)
        :param n_last_pages_to_ignore: number of last pages to ignore
        :return: (cleaned pages, found_header_str, found_footer_str)
        """

        pages = text.split("\f")

        # header
        start_of_pages = [p[:n_chars] for p in pages[n_first_pages_to_ignore:-n_last_pages_to_ignore]]
        found_header = self._find_longest_common_ngram(start_of_pages)
        if found_header:
            pages = [page.replace(found_header, "") for page in pages]

        # footer
        end_of_pages = [p[-n_chars:] for p in pages[n_first_pages_to_ignore:-n_last_pages_to_ignore]]
        found_footer = self._find_longest_common_ngram(end_of_pages)
        if found_footer:
            pages = [page.replace(found_footer, "") for page in pages]
        logger.debug(f"Removed header '{found_header}' and footer '{found_footer}' in document")
        text = "\f".join(pages)
        return text

    def _ngram(self, seq: str, n: int) -> Generator[str, None, None]:
        """
        Return ngram (of tokens - currently split by whitespace)
        :param seq: str, string from which the ngram shall be created
        :param n: int, n of ngram
        :return: str, ngram as string
        """

        # In order to maintain the original whitespace, but still consider \n and \t for n-gram tokenization,
        # we add a space here and remove it after creation of the ngrams again (see below)
        seq = seq.replace("\n", " \n")
        seq = seq.replace("\t", " \t")

        words = seq.split(" ")
        ngrams = (
            " ".join(words[i : i + n]).replace(" \n", "\n").replace(" \t", "\t") for i in range(0, len(words) - n + 1)
        )

        return ngrams

    def _allngram(self, seq: str, min_ngram: int, max_ngram: int) -> Set[str]:
        lengths = range(min_ngram, max_ngram) if max_ngram else range(min_ngram, len(seq))
        ngrams = map(partial(self._ngram, seq), lengths)
        res = set(chain.from_iterable(ngrams))
        return res

    def _find_longest_common_ngram(
        self, sequences: List[str], max_ngram: int = 30, min_ngram: int = 3
    ) -> Optional[str]:
        """
        Find the longest common ngram across different text sequences (e.g. start of pages).
        Considering all ngrams between the specified range. Helpful for finding footers, headers etc.

        :param sequences: list[str], list of strings that shall be searched for common n_grams
        :param max_ngram: int, maximum length of ngram to consider
        :param min_ngram: minimum length of ngram to consider
        :return: str, common string of all sections
        """
        sequences = [s for s in sequences if s]  # filter empty sequences
        if not sequences:
            return None
        seqs_ngrams = map(partial(self._allngram, min_ngram=min_ngram, max_ngram=max_ngram), sequences)
        intersection = reduce(set.intersection, seqs_ngrams)

        try:
            longest = max(intersection, key=len)
        except ValueError:
            # no common sequence found
            longest = ""
        return longest if longest.strip() else None

__init__

__init__(clean_whitespace: bool = True, clean_header_footer: bool = False, clean_empty_lines: bool = True, split_by: str = 'word', split_length: int = 200, split_overlap: int = 0, split_answers: bool = False, split_respect_sentence_boundary: bool = True, language: str = 'en')

Parameters:

Name Type Description Default
clean_header_footer bool

Use heuristic to remove footers and headers across different pages by searching for the longest common string. This heuristic uses exact matches and therefore works well for footers like "Copyright 2019 by XXX", but won't detect "Page 3 of 4" or similar.

False
clean_whitespace bool

Strip whitespaces before or after each line in the text.

True
clean_empty_lines bool

Remove more than two empty lines in the text.

True
split_by str

Unit for splitting the document. Can be "word", "sentence", or "passage". Set to None to disable splitting.

'word'
split_length int

Max. number of the above split unit (e.g. words) that are allowed in one document. For instance, if n -> 10 & split_by -> "sentence", then each output document will have 10 sentences.

200
split_overlap int

Word overlap between two adjacent documents after a split. Setting this to a positive number essentially enables the sliding window approach. For example, if split_by -> word, split_length -> 5 & split_overlap -> 2, then the splits would be like: [w1 w2 w3 w4 w5, w4 w5 w6 w7 w8, w7 w8 w10 w11 w12]. Set the value to 0 to ensure there is no overlap among the documents after splitting.

0
split_respect_sentence_boundary bool

Whether to split in partial sentences if split_by -> word. If set to True, the individual split will always have complete sentences & the number of words will be <= split_length.

True
language str

The language used by "nltk.tokenize.sent_tokenize" in iso639 format. Available options: "en", "es", "de", "fr" & many more.

'en'
Source code in pipelines/pipelines/nodes/preprocessor/preprocessor.py
def __init__(
    self,
    clean_whitespace: bool = True,
    clean_header_footer: bool = False,
    clean_empty_lines: bool = True,
    split_by: str = "word",
    split_length: int = 200,
    split_overlap: int = 0,
    split_answers: bool = False,
    split_respect_sentence_boundary: bool = True,
    language: str = "en",
):
    """
    :param clean_header_footer: Use heuristic to remove footers and headers across different pages by searching
                                 for the longest common string. This heuristic uses exact matches and therefore
                                 works well for footers like "Copyright 2019 by XXX", but won't detect "Page 3 of 4"
                                 or similar.
    :param clean_whitespace: Strip whitespaces before or after each line in the text.
    :param clean_empty_lines: Remove more than two empty lines in the text.
    :param split_by: Unit for splitting the document. Can be "word", "sentence", or "passage". Set to None to disable splitting.
    :param split_length: Max. number of the above split unit (e.g. words) that are allowed in one document. For instance, if n -> 10 & split_by ->
                       "sentence", then each output document will have 10 sentences.
    :param split_overlap: Word overlap between two adjacent documents after a split.
                          Setting this to a positive number essentially enables the sliding window approach.
                          For example, if split_by -> `word`,
                          split_length -> 5 & split_overlap -> 2, then the splits would be like:
                          [w1 w2 w3 w4 w5, w4 w5 w6 w7 w8, w7 w8 w10 w11 w12].
                          Set the value to 0 to ensure there is no overlap among the documents after splitting.
    :param split_respect_sentence_boundary: Whether to split in partial sentences if split_by -> `word`. If set
                                            to True, the individual split will always have complete sentences &
                                            the number of words will be <= split_length.
    :param language: The language used by "nltk.tokenize.sent_tokenize" in iso639 format. Available options: "en", "es", "de", "fr" & many more.
    """

    # save init parameters to enable export of component config as YAML
    self.set_config(
        clean_whitespace=clean_whitespace,
        clean_header_footer=clean_header_footer,
        clean_empty_lines=clean_empty_lines,
        split_by=split_by,
        split_length=split_length,
        split_overlap=split_overlap,
        split_answers=split_answers,
        split_respect_sentence_boundary=split_respect_sentence_boundary,
    )

    try:
        if nltk:
            nltk.data.find("tokenizers/punkt")
    except LookupError:
        try:
            if nltk:
                nltk.download("punkt")
        except FileExistsError as error:
            logger.debug("NLTK punkt tokenizer seems to be already downloaded. Error message: %s", error)
            pass

    self.clean_whitespace = clean_whitespace
    self.clean_header_footer = clean_header_footer
    self.clean_empty_lines = clean_empty_lines
    self.split_by = split_by
    self.split_length = split_length
    self.split_overlap = split_overlap
    self.split_respect_sentence_boundary = split_respect_sentence_boundary
    self.language = language
    self.print_log: Set[str] = set()
    self.split_answers = split_answers

clean

clean(document: dict, clean_whitespace: bool, clean_header_footer: bool, clean_empty_lines: bool) -> dict

Perform document cleaning on a single document and return a single document. This method will deal with whitespaces, headers, footers and empty lines. Its exact functionality is defined by the parameters passed into PreProcessor.init().

Source code in pipelines/pipelines/nodes/preprocessor/preprocessor.py
def clean(
    self,
    document: dict,
    clean_whitespace: bool,
    clean_header_footer: bool,
    clean_empty_lines: bool,
) -> dict:
    """
    Perform document cleaning on a single document and return a single document. This method will deal with whitespaces, headers, footers
    and empty lines. Its exact functionality is defined by the parameters passed into PreProcessor.__init__().
    """
    text = document["content"]
    if clean_header_footer:
        text = self._find_and_remove_header_footer(
            text, n_chars=300, n_first_pages_to_ignore=1, n_last_pages_to_ignore=1
        )

    if clean_whitespace:
        lines = text.splitlines()

        cleaned_lines = []
        for line in lines:
            line = line.strip()
            cleaned_lines.append(line)
        text = "\n".join(cleaned_lines)

    if clean_empty_lines:
        text = re.sub(r"\n\n+", "\n\n", text)

    document["content"] = text
    return document

process

process(documents: Union[dict, List[dict]], clean_whitespace: Optional[bool] = None, clean_header_footer: Optional[bool] = None, clean_empty_lines: Optional[bool] = None, split_by: Optional[str] = None, split_length: Optional[int] = None, split_overlap: Optional[int] = None, split_respect_sentence_boundary: Optional[bool] = None) -> List[dict]

Perform document cleaning and splitting. Can take a single document or a list of documents as input and returns a list of documents.

Source code in pipelines/pipelines/nodes/preprocessor/preprocessor.py
def process(
    self,
    documents: Union[dict, List[dict]],
    clean_whitespace: Optional[bool] = None,
    clean_header_footer: Optional[bool] = None,
    clean_empty_lines: Optional[bool] = None,
    split_by: Optional[str] = None,
    split_length: Optional[int] = None,
    split_overlap: Optional[int] = None,
    split_respect_sentence_boundary: Optional[bool] = None,
) -> List[dict]:
    """
    Perform document cleaning and splitting. Can take a single document or a list of documents as input and returns a list of documents.
    """

    kwargs = {
        "clean_whitespace": clean_whitespace,
        "clean_header_footer": clean_header_footer,
        "clean_empty_lines": clean_empty_lines,
        "split_by": split_by,
        "split_length": split_length,
        "split_overlap": split_overlap,
        "split_respect_sentence_boundary": split_respect_sentence_boundary,
    }

    ret = []

    if type(documents) == dict:
        ret = self._process_single(document=documents, **kwargs)  # type: ignore
    elif type(documents) == list:
        ret = self._process_batch(documents=list(documents), **kwargs)

    else:
        raise Exception("documents provided to PreProcessor.prepreprocess() is not of type list nor Document")

    return ret

split

split(document: dict, split_by: str, split_length: int, split_overlap: int, split_answers: bool, split_respect_sentence_boundary: bool) -> List[dict]

Perform document splitting on a single document. This method can split on different units, at different lengths, with different strides. It can also respect sentence boundaries. Its exact functionality is defined by the parameters passed into PreProcessor.init(). Takes a single document as input and returns a list of documents.

Source code in pipelines/pipelines/nodes/preprocessor/preprocessor.py
def split(
    self,
    document: dict,
    split_by: str,
    split_length: int,
    split_overlap: int,
    split_answers: bool,
    split_respect_sentence_boundary: bool,
) -> List[dict]:
    """Perform document splitting on a single document. This method can split on different units, at different lengths,
    with different strides. It can also respect sentence boundaries. Its exact functionality is defined by
    the parameters passed into PreProcessor.__init__(). Takes a single document as input and returns a list of documents."""

    if not split_by:
        return [document]

    if not split_length:
        raise Exception("split_length needs be set when using split_by.")

    if split_respect_sentence_boundary and split_by != "word":
        raise NotImplementedError(
            "'split_respect_sentence_boundary=True' is only compatible with split_by='word'."
        )

    text = document["content"]

    if split_respect_sentence_boundary and split_by == "word":
        # split by words ensuring no sub sentence splits
        if self.language == "chinese":
            sentences = text
        else:
            language_name = iso639_to_nltk.get(self.language)
            sentences = nltk.tokenize.sent_tokenize(text, language=language_name)

        word_count = 0
        list_splits = []
        current_slice: List[str] = []
        for sen in sentences:
            if self.language == "chinese":
                current_word_count = len(sen)
            else:
                current_word_count = len(sen.split(" "))
            if current_word_count > split_length:
                long_sentence_message = "One or more sentence found with word count higher than the split length."
                if long_sentence_message not in self.print_log:
                    self.print_log.add(long_sentence_message)
                    logger.warning(long_sentence_message)
            if word_count + current_word_count > split_length:
                list_splits.append(current_slice)
                # Enable split_stride with split_by='word' while respecting sentence boundaries.
                if split_overlap:
                    overlap = []
                    w_count = 0
                    for s in current_slice[::-1]:
                        if self.language == "chinese":
                            sen_len = len(s)
                        else:
                            sen_len = len(s.split(" "))
                        if w_count < split_overlap:
                            overlap.append(s)
                            w_count += sen_len
                        else:
                            break
                    current_slice = list(reversed(overlap))
                    word_count = w_count
                else:
                    current_slice = []
                    word_count = 0
            current_slice.append(sen)
            if self.language == "chinese":
                word_count += len(sen)
            else:
                word_count += len(sen.split(" "))
        if current_slice:
            list_splits.append(current_slice)

        text_splits = []
        for sl in list_splits:
            if self.language == "chinese":
                txt = "".join(sl)
            else:
                txt = " ".join(sl)
            if len(txt) > 0:
                text_splits.append(txt)
    else:
        # create individual "elements" of passage, sentence, or word
        # Faq text need to split text by '\n' of a passage
        if split_answers and split_by == "passage":
            text_splits = text.split("\n")
        elif split_by == "passage":
            elements = text.split("\n\n")
        elif split_by == "sentence":
            language_name = iso639_to_nltk.get(self.language)
            elements = nltk.tokenize.sent_tokenize(text, language=language_name)
        elif split_by == "word":
            elements = text.split(" ")
        else:
            raise NotImplementedError(
                "PreProcessor only supports 'passage', 'sentence' or 'word' split_by options."
            )

        # concatenate individual elements based on split_length & split_stride
        # FAQ text process don't need split text into fix lengths
        if not split_answers:
            segments = windowed(elements, n=split_length, step=split_length - split_overlap)

            text_splits = []
            for seg in segments:
                txt = " ".join([t for t in seg if t is not None])
                if len(txt) > 0:
                    text_splits.append(txt)
    # create new document dicts for each text split
    documents = []
    for i, txt in enumerate(text_splits):
        doc = deepcopy(document)
        doc["content"] = txt

        if "meta" not in doc.keys() or doc["meta"] is None:
            doc["meta"] = {}
        if split_answers:
            text_arr = doc["content"].split("\t")
            if len(text_arr) > 2:
                raise Exception("Each line text must be two columns and separated by \t")
            # Maybe empty lines
            if len(text_arr) == 1:
                logger.info("Some lines in your text cannot parse into question and text, maybe empty lines")
                continue
            else:
                query, answer = text_arr
            doc["content"] = query
            doc["meta"]["answer"] = answer
        doc["meta"]["_split_id"] = i
        documents.append(doc)

    return documents

pipelines.pipelines.nodes.preprocessor.text_splitter

CharacterTextSplitter

Implementation of splitting text that looks at characters.

Source code in pipelines/pipelines/nodes/preprocessor/text_splitter.py
class CharacterTextSplitter(TextSplitter):
    """Implementation of splitting text that looks at characters."""

    def __init__(self, separator: str = "\n\n", filters: list = [], **kwargs: Any):
        """Create a new TextSplitter."""
        super().__init__(**kwargs)
        self._separator = separator
        self._filter = filters

    def split_text(self, text: str, separator: Optional[str] = None, **kwargs) -> List[str]:
        """Split incoming text and return chunks."""
        # First we naively split the large input into a bunch of smaller ones.
        if separator is None:
            separator = self._separator
        if separator:
            splits = text.split(separator)
        else:
            splits = list(text)
        return self._merge_splits(splits, separator, **kwargs)

__init__

__init__(separator: str = '\n\n', filters: list = [], **kwargs: Any)

Create a new TextSplitter.

Source code in pipelines/pipelines/nodes/preprocessor/text_splitter.py
def __init__(self, separator: str = "\n\n", filters: list = [], **kwargs: Any):
    """Create a new TextSplitter."""
    super().__init__(**kwargs)
    self._separator = separator
    self._filter = filters

split_text

split_text(text: str, separator: Optional[str] = None, **kwargs) -> List[str]

Split incoming text and return chunks.

Source code in pipelines/pipelines/nodes/preprocessor/text_splitter.py
def split_text(self, text: str, separator: Optional[str] = None, **kwargs) -> List[str]:
    """Split incoming text and return chunks."""
    # First we naively split the large input into a bunch of smaller ones.
    if separator is None:
        separator = self._separator
    if separator:
        splits = text.split(separator)
    else:
        splits = list(text)
    return self._merge_splits(splits, separator, **kwargs)

HeaderType

Header type as typed dict.

Source code in pipelines/pipelines/nodes/preprocessor/text_splitter.py
class HeaderType(TypedDict):
    """Header type as typed dict."""

    level: int
    name: str
    data: str

LineType

Line type as typed dict.

Source code in pipelines/pipelines/nodes/preprocessor/text_splitter.py
class LineType(TypedDict):
    """Line type as typed dict."""

    metadata: Dict[str, str]
    content: str

MarkdownHeaderTextSplitter

Implementation of splitting markdown files based on specified headers.

Source code in pipelines/pipelines/nodes/preprocessor/text_splitter.py
class MarkdownHeaderTextSplitter(BaseComponent):
    """Implementation of splitting markdown files based on specified headers."""

    outgoing_edges = 1

    def __init__(
        self,
        headers_to_split_on: List[Tuple[str, str]] = [
            ("#", "Header 1"),
            ("##", "Header 2"),
            ("###", "Header 3"),
            ("####", "Header 4"),
            ("#####", "Header 5"),
            ("######", "Header 6"),
        ],
        return_each_line: bool = False,
        filters: list = [],
        chunk_size: int = 4000,
        chunk_overlap: int = 200,
        length_function: Callable[[str], int] = len,
        separator="\n",
    ):
        """Create a new MarkdownHeaderTextSplitter.

        Args:
            headers_to_split_on: Headers we want to track
            return_each_line: Return each line w/ associated headers
        """
        # Output line-by-line or aggregated into chunks w/ common headers
        self.return_each_line = return_each_line
        self._chunk_size = chunk_size
        # Given the headers we want to split on,
        # (e.g., "#, ##, etc") order by length
        self.headers_to_split_on = sorted(headers_to_split_on, key=lambda split: len(split[0]), reverse=True)
        self._filter = filters
        self._length_function = length_function
        self._separator = separator
        self._chunk_overlap = chunk_overlap

    def aggregate_lines_to_chunks(self, lines: List[LineType]) -> List[dict]:
        """Combine lines with common metadata into chunks
        Args:
            lines: Line of text / associated header metadata
        """
        aggregated_chunks: List[LineType] = []

        for line in lines:
            if aggregated_chunks and aggregated_chunks[-1]["metadata"] == line["metadata"]:
                # If the last line in the aggregated list
                # has the same metadata as the current line,
                # append the current content to the last lines's content
                aggregated_chunks[-1]["content"] += "  \n" + line["content"]
            else:
                # Otherwise, append the current line to the aggregated list
                aggregated_chunks.append(line)

        return [{"page_content": chunk["content"], "metadata": chunk["metadata"]} for chunk in aggregated_chunks]

    def split_text(
        self,
        text: str,
        separator: Optional[str] = None,
        chunk_size: Optional[int] = None,
        chunk_overlap: Optional[int] = None,
    ) -> List[dict]:
        """Split markdown file
        Args:
            text: Markdown file"""
        if separator is None:
            separator = self._separator
        if chunk_size is None:
            chunk_size = self._chunk_size
        if chunk_overlap is None:
            chunk_overlap = self._chunk_overlap

        # Split the input text by newline character ("\n").
        lines = text.split(separator)
        # Final output
        lines_with_metadata: List[LineType] = []
        # Content and metadata of the chunk currently being processed
        current_content: List[str] = []
        current_metadata: Dict[str, str] = {}
        # Keep track of the nested header structure
        # header_stack: List[Dict[str, Union[int, str]]] = []
        header_stack: List[HeaderType] = []
        initial_metadata: Dict[str, str] = {}
        for line in lines:
            stripped_line = line.strip()
            # Check each line against each of the header types (e.g., #, ##)
            for sep, name in self.headers_to_split_on:
                # Check if line starts with a header that we intend to split on
                if stripped_line.startswith(sep) and (
                    # Header with no text OR header is followed by space
                    # Both are valid conditions that sep is being used a header
                    len(stripped_line) == len(sep)
                    or stripped_line[len(sep)] == " "
                ):
                    # Ensure we are tracking the header as metadata
                    if name is not None:
                        # Get the current header level
                        current_header_level = sep.count("#")

                        # Pop out headers of lower or same level from the stack
                        while header_stack and header_stack[-1]["level"] >= current_header_level:
                            # We have encountered a new header
                            # at the same or higher level
                            popped_header = header_stack.pop()
                            # Clear the metadata for the
                            # popped header in initial_metadata
                            if popped_header["name"] in initial_metadata:
                                initial_metadata.pop(popped_header["name"])

                        # Push the current header to the stack
                        header: HeaderType = {
                            "level": current_header_level,
                            "name": name,
                            "data": stripped_line[len(sep) :].strip(),
                        }
                        header_stack.append(header)
                        # Update initial_metadata with the current header
                        initial_metadata[name] = header["data"]

                    # Add the previous line to the lines_with_metadata
                    # only if current_content is not empty
                    if current_content:
                        lines_with_metadata.append(
                            {
                                "content": separator.join(current_content),
                                "metadata": current_metadata.copy(),
                            }
                        )
                        current_content.clear()

                    break
            else:
                if stripped_line:
                    current_content.append(stripped_line)
                elif current_content:
                    lines_with_metadata.append(
                        {
                            "content": separator.join(current_content),
                            "metadata": current_metadata.copy(),
                        }
                    )
                    current_content.clear()

            current_metadata = initial_metadata.copy()
        if current_content:
            lines_with_metadata.append({"content": separator.join(current_content), "metadata": current_metadata})
        # lines_with_metadata has each line with associated header metadata
        # aggregate these into chunks based on common metadata
        if not self.return_each_line:
            return self._merge_splits(
                self.aggregate_lines_to_chunks(lines_with_metadata), separator, chunk_size, chunk_overlap
            )
        else:
            return self._merge_splits(
                [{"page_content": chunk["content"], "metadata": chunk["metadata"]} for chunk in lines_with_metadata],
                separator,
                chunk_size,
                chunk_overlap,
            )

    def clean(self, documents: List[dict], filters: Optional[List[str]] = None):
        if filters is None:
            filters = self._filter
        for special_character in filters:
            for doc in documents:
                doc["content"] = doc["content"].replace(special_character, "")
        return documents

    def _join_docs(self, docs: List[str], separator: str) -> Optional[str]:
        text = separator.join(docs)
        text = text.strip()
        if text == "":
            return None
        else:
            return text

    def _merge_splits(
        self,
        documents: List[dict],
        separator: Optional[str] = None,
        chunk_size: Optional[int] = None,
        chunk_overlap: [int] = None,
    ) -> List[str]:
        # We now want to combine these smaller pieces into medium size
        # chunks to send to the LLM.
        if chunk_size is None:
            chunk_size = self._chunk_size
        if chunk_overlap is None:
            chunk_overlap = self._chunk_overlap
        if separator is None:
            separator = self._separator
        separator_len = self._length_function(separator)

        docs = []
        current_doc: List[str] = []
        total = 0
        for doc in documents:
            if doc["metadata"] != {}:
                head = sorted(doc["metadata"].items(), key=lambda x: x[0], reverse=True)[0][1]
                d = head + separator + doc["page_content"]
            else:
                d = doc["page_content"]
            _len = self._length_function(d)
            if total + _len + (separator_len if len(current_doc) > 0 else 0) > chunk_size:
                if total > chunk_size:
                    logger.warning(
                        f"Created a chunk of size {total}, " f"which is longer than the specified {chunk_size}"
                    )
                if len(current_doc) > 0:
                    doc = self._join_docs(current_doc, separator)
                    if doc is not None:
                        docs.append(doc)
                    # Keep on popping if:
                    # - we have a larger chunk than in the chunk overlap
                    # - or if we still have any chunks and the length is long
                    while total > chunk_overlap or (
                        total + _len + (separator_len if len(current_doc) > 0 else 0) > chunk_size and total > 0
                    ):
                        total -= self._length_function(current_doc[0]) + (separator_len if len(current_doc) > 1 else 0)
                        current_doc = current_doc[1:]
            current_doc.append(d)
            total += _len + (separator_len if len(current_doc) > 1 else 0)
        doc = self._join_docs(current_doc, separator)
        if doc is not None:
            docs.append(doc)
        return docs

    def run(
        self,
        documents: Union[dict, List[dict]],
        meta: Optional[Union[Dict[str, str], List[Dict[str, str]]]] = None,
        filters: Optional[List[str]] = None,
        chunk_size: Optional[int] = None,
        chunk_overlap: Optional[int] = None,
        separator: Optional[str] = None,
    ):
        if filters is None:
            filters = self._filter
        if chunk_size is None:
            chunk_size = self._chunk_size
        if chunk_overlap is None:
            chunk_overlap = self._chunk_overlap
        if separator is None:
            separator = self._separator
        ret = []
        if type(documents) == list:
            for document in documents:
                text_splits = self.split_text(document["content"], separator, chunk_size, chunk_overlap)
                for i, txt in enumerate(text_splits):
                    doc = {}
                    doc["content"] = txt

                    if "meta" not in doc.keys() or doc["meta"] is None:
                        doc["meta"] = {}

                    doc["meta"]["_split_id"] = i
                    ret.append(doc)
        elif type(documents) == dict:
            text_splits = self.split_text(documents["content"], separator, chunk_size, chunk_overlap)
            for i, txt in enumerate(text_splits):
                doc = {}
                doc["content"] = txt

                if "meta" not in doc.keys() or doc["meta"] is None:
                    doc["meta"] = {}

                doc["meta"]["_split_id"] = i
                ret.append(doc)
        if filters is None:
            filters = self._filter
        if filters is not None and len(filters) > 0:
            ret = self.clean(ret, filters)
        result = {"documents": ret}
        return result, "output_1"

__init__

__init__(headers_to_split_on: List[Tuple[str, str]] = [('#', 'Header 1'), ('##', 'Header 2'), ('###', 'Header 3'), ('####', 'Header 4'), ('#####', 'Header 5'), ('######', 'Header 6')], return_each_line: bool = False, filters: list = [], chunk_size: int = 4000, chunk_overlap: int = 200, length_function: Callable[[str], int] = len, separator='\n')

Create a new MarkdownHeaderTextSplitter.

Args: headers_to_split_on: Headers we want to track return_each_line: Return each line w/ associated headers

Source code in pipelines/pipelines/nodes/preprocessor/text_splitter.py
def __init__(
    self,
    headers_to_split_on: List[Tuple[str, str]] = [
        ("#", "Header 1"),
        ("##", "Header 2"),
        ("###", "Header 3"),
        ("####", "Header 4"),
        ("#####", "Header 5"),
        ("######", "Header 6"),
    ],
    return_each_line: bool = False,
    filters: list = [],
    chunk_size: int = 4000,
    chunk_overlap: int = 200,
    length_function: Callable[[str], int] = len,
    separator="\n",
):
    """Create a new MarkdownHeaderTextSplitter.

    Args:
        headers_to_split_on: Headers we want to track
        return_each_line: Return each line w/ associated headers
    """
    # Output line-by-line or aggregated into chunks w/ common headers
    self.return_each_line = return_each_line
    self._chunk_size = chunk_size
    # Given the headers we want to split on,
    # (e.g., "#, ##, etc") order by length
    self.headers_to_split_on = sorted(headers_to_split_on, key=lambda split: len(split[0]), reverse=True)
    self._filter = filters
    self._length_function = length_function
    self._separator = separator
    self._chunk_overlap = chunk_overlap

aggregate_lines_to_chunks

aggregate_lines_to_chunks(lines: List[LineType]) -> List[dict]

Combine lines with common metadata into chunks Args: lines: Line of text / associated header metadata

Source code in pipelines/pipelines/nodes/preprocessor/text_splitter.py
def aggregate_lines_to_chunks(self, lines: List[LineType]) -> List[dict]:
    """Combine lines with common metadata into chunks
    Args:
        lines: Line of text / associated header metadata
    """
    aggregated_chunks: List[LineType] = []

    for line in lines:
        if aggregated_chunks and aggregated_chunks[-1]["metadata"] == line["metadata"]:
            # If the last line in the aggregated list
            # has the same metadata as the current line,
            # append the current content to the last lines's content
            aggregated_chunks[-1]["content"] += "  \n" + line["content"]
        else:
            # Otherwise, append the current line to the aggregated list
            aggregated_chunks.append(line)

    return [{"page_content": chunk["content"], "metadata": chunk["metadata"]} for chunk in aggregated_chunks]

split_text

split_text(text: str, separator: Optional[str] = None, chunk_size: Optional[int] = None, chunk_overlap: Optional[int] = None) -> List[dict]

Split markdown file Args: text: Markdown file

Source code in pipelines/pipelines/nodes/preprocessor/text_splitter.py
def split_text(
    self,
    text: str,
    separator: Optional[str] = None,
    chunk_size: Optional[int] = None,
    chunk_overlap: Optional[int] = None,
) -> List[dict]:
    """Split markdown file
    Args:
        text: Markdown file"""
    if separator is None:
        separator = self._separator
    if chunk_size is None:
        chunk_size = self._chunk_size
    if chunk_overlap is None:
        chunk_overlap = self._chunk_overlap

    # Split the input text by newline character ("\n").
    lines = text.split(separator)
    # Final output
    lines_with_metadata: List[LineType] = []
    # Content and metadata of the chunk currently being processed
    current_content: List[str] = []
    current_metadata: Dict[str, str] = {}
    # Keep track of the nested header structure
    # header_stack: List[Dict[str, Union[int, str]]] = []
    header_stack: List[HeaderType] = []
    initial_metadata: Dict[str, str] = {}
    for line in lines:
        stripped_line = line.strip()
        # Check each line against each of the header types (e.g., #, ##)
        for sep, name in self.headers_to_split_on:
            # Check if line starts with a header that we intend to split on
            if stripped_line.startswith(sep) and (
                # Header with no text OR header is followed by space
                # Both are valid conditions that sep is being used a header
                len(stripped_line) == len(sep)
                or stripped_line[len(sep)] == " "
            ):
                # Ensure we are tracking the header as metadata
                if name is not None:
                    # Get the current header level
                    current_header_level = sep.count("#")

                    # Pop out headers of lower or same level from the stack
                    while header_stack and header_stack[-1]["level"] >= current_header_level:
                        # We have encountered a new header
                        # at the same or higher level
                        popped_header = header_stack.pop()
                        # Clear the metadata for the
                        # popped header in initial_metadata
                        if popped_header["name"] in initial_metadata:
                            initial_metadata.pop(popped_header["name"])

                    # Push the current header to the stack
                    header: HeaderType = {
                        "level": current_header_level,
                        "name": name,
                        "data": stripped_line[len(sep) :].strip(),
                    }
                    header_stack.append(header)
                    # Update initial_metadata with the current header
                    initial_metadata[name] = header["data"]

                # Add the previous line to the lines_with_metadata
                # only if current_content is not empty
                if current_content:
                    lines_with_metadata.append(
                        {
                            "content": separator.join(current_content),
                            "metadata": current_metadata.copy(),
                        }
                    )
                    current_content.clear()

                break
        else:
            if stripped_line:
                current_content.append(stripped_line)
            elif current_content:
                lines_with_metadata.append(
                    {
                        "content": separator.join(current_content),
                        "metadata": current_metadata.copy(),
                    }
                )
                current_content.clear()

        current_metadata = initial_metadata.copy()
    if current_content:
        lines_with_metadata.append({"content": separator.join(current_content), "metadata": current_metadata})
    # lines_with_metadata has each line with associated header metadata
    # aggregate these into chunks based on common metadata
    if not self.return_each_line:
        return self._merge_splits(
            self.aggregate_lines_to_chunks(lines_with_metadata), separator, chunk_size, chunk_overlap
        )
    else:
        return self._merge_splits(
            [{"page_content": chunk["content"], "metadata": chunk["metadata"]} for chunk in lines_with_metadata],
            separator,
            chunk_size,
            chunk_overlap,
        )

RecursiveCharacterTextSplitter

Implementation of splitting text that looks at characters. Recursively tries to split by different characters to find one that works.

Source code in pipelines/pipelines/nodes/preprocessor/text_splitter.py
class RecursiveCharacterTextSplitter(TextSplitter):
    """Implementation of splitting text that looks at characters.
    Recursively tries to split by different characters to find one
    that works.
    """

    def __init__(self, separators: Optional[List[str]] = None, **kwargs: Any):
        """Create a new TextSplitter."""
        super().__init__(**kwargs)
        self._separators = separators or ["\n\n", "\n", " ", ""]

    def split_text(self, text: str, **kwargs) -> List[str]:
        """Split incoming text and return chunks."""
        final_chunks = []
        # Get appropriate separator to use
        separator = self._separators[-1]
        for _s in self._separators:
            if _s == "":
                separator = _s
                break
            if _s in text:
                separator = _s
                break
        # Now that we have the separator, split the text
        if separator:
            splits = text.split(separator)
        else:
            splits = list(text)
        # Now go merging things, recursively splitting longer texts.
        _good_splits = []
        for s in splits:
            if self._length_function(s) < self._chunk_size:
                _good_splits.append(s)
            else:
                if _good_splits:
                    merged_text = self._merge_splits(
                        _good_splits,
                        separator,
                        chunk_size=kwargs.get("chunk_size", None),
                        chunk_overlap=kwargs.get("chunk_overlap", None),
                    )
                    final_chunks.extend(merged_text)
                    _good_splits = []
                other_info = self.split_text(s)
                final_chunks.extend(other_info)
        if _good_splits:
            merged_text = self._merge_splits(
                _good_splits,
                separator,
                chunk_size=kwargs.get("chunk_size", None),
                chunk_overlap=kwargs.get("chunk_overlap", None),
            )
            final_chunks.extend(merged_text)
        return final_chunks

__init__

__init__(separators: Optional[List[str]] = None, **kwargs: Any)

Create a new TextSplitter.

Source code in pipelines/pipelines/nodes/preprocessor/text_splitter.py
def __init__(self, separators: Optional[List[str]] = None, **kwargs: Any):
    """Create a new TextSplitter."""
    super().__init__(**kwargs)
    self._separators = separators or ["\n\n", "\n", " ", ""]

split_text

split_text(text: str, **kwargs) -> List[str]

Split incoming text and return chunks.

Source code in pipelines/pipelines/nodes/preprocessor/text_splitter.py
def split_text(self, text: str, **kwargs) -> List[str]:
    """Split incoming text and return chunks."""
    final_chunks = []
    # Get appropriate separator to use
    separator = self._separators[-1]
    for _s in self._separators:
        if _s == "":
            separator = _s
            break
        if _s in text:
            separator = _s
            break
    # Now that we have the separator, split the text
    if separator:
        splits = text.split(separator)
    else:
        splits = list(text)
    # Now go merging things, recursively splitting longer texts.
    _good_splits = []
    for s in splits:
        if self._length_function(s) < self._chunk_size:
            _good_splits.append(s)
        else:
            if _good_splits:
                merged_text = self._merge_splits(
                    _good_splits,
                    separator,
                    chunk_size=kwargs.get("chunk_size", None),
                    chunk_overlap=kwargs.get("chunk_overlap", None),
                )
                final_chunks.extend(merged_text)
                _good_splits = []
            other_info = self.split_text(s)
            final_chunks.extend(other_info)
    if _good_splits:
        merged_text = self._merge_splits(
            _good_splits,
            separator,
            chunk_size=kwargs.get("chunk_size", None),
            chunk_overlap=kwargs.get("chunk_overlap", None),
        )
        final_chunks.extend(merged_text)
    return final_chunks

SpacyTextSplitter

Implementation of splitting text that looks at sentences using Spacy.

Source code in pipelines/pipelines/nodes/preprocessor/text_splitter.py
class SpacyTextSplitter(TextSplitter):
    """Implementation of splitting text that looks at sentences using Spacy."""

    def __init__(self, pipeline: str = "zh_core_web_sm", **kwargs: Any) -> None:
        """Initialize the spacy text splitter."""
        super().__init__(**kwargs)
        try:
            import spacy
        except ImportError:
            raise ImportError("Spacy is not installed, please install it with `pip install spacy`.")
        try:
            self._tokenizer = spacy.load(pipeline)
        except:
            spacy.cli.download(pipeline)
            self._tokenizer = spacy.load(pipeline)

    def split_text(self, text: str, separator: Optional[str] = None, **kwargs) -> List[str]:
        """Split incoming text and return chunks."""
        if len(text) > 1000000:
            self._tokenizer.max_length = len(text) + 100
        splits = (str(s) for s in self._tokenizer(text).sents)
        return self._merge_splits(splits, separator, **kwargs)

__init__

__init__(pipeline: str = 'zh_core_web_sm', **kwargs: Any) -> None

Initialize the spacy text splitter.

Source code in pipelines/pipelines/nodes/preprocessor/text_splitter.py
def __init__(self, pipeline: str = "zh_core_web_sm", **kwargs: Any) -> None:
    """Initialize the spacy text splitter."""
    super().__init__(**kwargs)
    try:
        import spacy
    except ImportError:
        raise ImportError("Spacy is not installed, please install it with `pip install spacy`.")
    try:
        self._tokenizer = spacy.load(pipeline)
    except:
        spacy.cli.download(pipeline)
        self._tokenizer = spacy.load(pipeline)

split_text

split_text(text: str, separator: Optional[str] = None, **kwargs) -> List[str]

Split incoming text and return chunks.

Source code in pipelines/pipelines/nodes/preprocessor/text_splitter.py
def split_text(self, text: str, separator: Optional[str] = None, **kwargs) -> List[str]:
    """Split incoming text and return chunks."""
    if len(text) > 1000000:
        self._tokenizer.max_length = len(text) + 100
    splits = (str(s) for s in self._tokenizer(text).sents)
    return self._merge_splits(splits, separator, **kwargs)

TextSplitter

Interface for splitting text into chunks.

Source code in pipelines/pipelines/nodes/preprocessor/text_splitter.py
class TextSplitter(BaseComponent):
    """Interface for splitting text into chunks."""

    outgoing_edges = 1

    def __init__(
        self,
        chunk_size: int = 4000,
        chunk_overlap: int = 200,
        length_function: Callable[[str], int] = len,
        filters: list = [],
        separator: str = "",
    ):
        """Create a new TextSplitter."""
        if chunk_overlap > chunk_size:
            raise ValueError(
                f"Got a larger chunk overlap ({chunk_overlap}) than chunk size " f"({chunk_size}), should be smaller."
            )
        self._chunk_size = chunk_size
        self._chunk_overlap = chunk_overlap
        self._length_function = length_function
        self._filter = filters
        self._separator = separator

    @abstractmethod
    def split_text(self, text: str, **kwargs) -> List[str]:
        """Split text into multiple components."""

    def create_documents(
        self, texts: List[str], metadatas: Optional[List[dict]] = None, separator: Optional[str] = None, **kwargs
    ) -> List[dict]:
        """Create documents from a list of texts."""
        _metadatas = metadatas or [{}] * len(texts)
        documents = []
        for i, text in enumerate(texts):
            for chunk in self.split_text(text, separator, **kwargs):
                new_doc = {"content": chunk, "meta": copy.deepcopy(_metadatas[i])}
                documents.append(new_doc)
        return documents

    def split_documents(self, documents: List[dict], **kwargs) -> List[dict]:
        """Split documents."""
        texts = [doc["content"] for doc in documents]
        metadatas = [doc["meta"] for doc in documents]
        return self.create_documents(texts, metadatas, **kwargs)

    def _join_docs(self, docs: List[str], separator: str, **kwargs) -> Optional[str]:
        text = separator.join(docs)
        text = text.strip()
        if text == "":
            return None
        else:
            return text

    def _merge_splits(
        self,
        splits: Iterable[str],
        separator: str,
        chunk_size: Optional[int] = None,
        chunk_overlap: Optional[int] = None,
    ) -> List[str]:
        # We now want to combine these smaller pieces into medium size
        # chunks to send to the LLM.
        if chunk_size is None:
            chunk_size = self._chunk_size
        if chunk_overlap is None:
            chunk_overlap = self._chunk_overlap
        if separator is None:
            separator = self._separator
        separator_len = self._length_function(separator)

        docs = []
        current_doc: List[str] = []
        total = 0
        for d in splits:
            _len = self._length_function(d)
            if total + _len + (separator_len if len(current_doc) > 0 else 0) > chunk_size:
                if total > chunk_size:
                    logger.warning(
                        f"Created a chunk of size {total}, " f"which is longer than the specified {chunk_size}"
                    )
                if len(current_doc) > 0:
                    doc = self._join_docs(current_doc, separator)
                    if doc is not None:
                        docs.append(doc)
                    # Keep on popping if:
                    # - we have a larger chunk than in the chunk overlap
                    # - or if we still have any chunks and the length is long
                    while total > chunk_overlap or (
                        total + _len + (separator_len if len(current_doc) > 0 else 0) > chunk_size and total > 0
                    ):
                        total -= self._length_function(current_doc[0]) + (separator_len if len(current_doc) > 1 else 0)
                        current_doc = current_doc[1:]
            current_doc.append(d)
            total += _len + (separator_len if len(current_doc) > 1 else 0)
        doc = self._join_docs(current_doc, separator)
        if doc is not None:
            docs.append(doc)
        return docs

    def clean(self, documents: List[dict], filters: List[str]):
        for special_character in filters:
            for doc in documents:
                doc["content"] = doc["content"].replace(special_character, "")
        return documents

    def run(  # type: ignore
        self,
        documents: Union[dict, List[dict]],
        meta: Optional[Union[Dict[str, str], List[Dict[str, str]]]] = None,  # type: ignore
        separator: Optional[str] = None,
        chunk_size: Optional[int] = None,
        chunk_overlap: Optional[int] = None,
        filters: Optional[List[str]] = None,
    ):
        if separator is None:
            separator = self._separator
        if chunk_size is None:
            chunk_size = self._chunk_size
        if chunk_overlap is None:
            chunk_overlap = self._chunk_overlap
        if filters is None:
            filters = self._filter
        ret = []
        if type(documents) == dict:  # single document
            text_splits = self.split_text(
                documents["content"], separator=separator, chunk_size=chunk_size, chunk_overlap=chunk_overlap
            )
            for i, txt in enumerate(text_splits):
                doc = copy.deepcopy(documents)
                doc["content"] = txt

                if "meta" not in doc.keys() or doc["meta"] is None:
                    doc["meta"] = {}

                doc["meta"]["_split_id"] = i
                ret.append(doc)

        elif type(documents) == list:  # list document
            for document in documents:
                text_splits = self.split_text(
                    document["content"], separator=separator, chunk_size=chunk_size, chunk_overlap=chunk_overlap
                )
                for i, txt in enumerate(text_splits):
                    doc = copy.deepcopy(document)
                    doc["content"] = txt

                    if "meta" not in doc.keys() or doc["meta"] is None:
                        doc["meta"] = {}

                    doc["meta"]["_split_id"] = i
                    ret.append(doc)
        if filters is not None and len(filters) > 0:
            ret = self.clean(ret, filters)
        result = {"documents": ret}
        return result, "output_1"

__init__

__init__(chunk_size: int = 4000, chunk_overlap: int = 200, length_function: Callable[[str], int] = len, filters: list = [], separator: str = '')

Create a new TextSplitter.

Source code in pipelines/pipelines/nodes/preprocessor/text_splitter.py
def __init__(
    self,
    chunk_size: int = 4000,
    chunk_overlap: int = 200,
    length_function: Callable[[str], int] = len,
    filters: list = [],
    separator: str = "",
):
    """Create a new TextSplitter."""
    if chunk_overlap > chunk_size:
        raise ValueError(
            f"Got a larger chunk overlap ({chunk_overlap}) than chunk size " f"({chunk_size}), should be smaller."
        )
    self._chunk_size = chunk_size
    self._chunk_overlap = chunk_overlap
    self._length_function = length_function
    self._filter = filters
    self._separator = separator

create_documents

create_documents(texts: List[str], metadatas: Optional[List[dict]] = None, separator: Optional[str] = None, **kwargs) -> List[dict]

Create documents from a list of texts.

Source code in pipelines/pipelines/nodes/preprocessor/text_splitter.py
def create_documents(
    self, texts: List[str], metadatas: Optional[List[dict]] = None, separator: Optional[str] = None, **kwargs
) -> List[dict]:
    """Create documents from a list of texts."""
    _metadatas = metadatas or [{}] * len(texts)
    documents = []
    for i, text in enumerate(texts):
        for chunk in self.split_text(text, separator, **kwargs):
            new_doc = {"content": chunk, "meta": copy.deepcopy(_metadatas[i])}
            documents.append(new_doc)
    return documents

split_documents

split_documents(documents: List[dict], **kwargs) -> List[dict]

Split documents.

Source code in pipelines/pipelines/nodes/preprocessor/text_splitter.py
def split_documents(self, documents: List[dict], **kwargs) -> List[dict]:
    """Split documents."""
    texts = [doc["content"] for doc in documents]
    metadatas = [doc["meta"] for doc in documents]
    return self.create_documents(texts, metadatas, **kwargs)

split_text abstractmethod

split_text(text: str, **kwargs) -> List[str]

Split text into multiple components.

Source code in pipelines/pipelines/nodes/preprocessor/text_splitter.py
@abstractmethod
def split_text(self, text: str, **kwargs) -> List[str]:
    """Split text into multiple components."""