1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
use core::mem::size_of;

use crate::memmem::{util::memcmp, vector::Vector, NeedleInfo};

/// The minimum length of a needle required for this algorithm. The minimum
/// is 2 since a length of 1 should just use memchr and a length of 0 isn't
/// a case handled by this searcher.
pub(crate) const MIN_NEEDLE_LEN: usize = 2;

/// The maximum length of a needle required for this algorithm.
///
/// In reality, there is no hard max here. The code below can handle any
/// length needle. (Perhaps that suggests there are missing optimizations.)
/// Instead, this is a heuristic and a bound guaranteeing our linear time
/// complexity.
///
/// It is a heuristic because when a candidate match is found, memcmp is run.
/// For very large needles with lots of false positives, memcmp can make the
/// code run quite slow.
///
/// It is a bound because the worst case behavior with memcmp is multiplicative
/// in the size of the needle and haystack, and we want to keep that additive.
/// This bound ensures we still meet that bound theoretically, since it's just
/// a constant. We aren't acting in bad faith here, memcmp on tiny needles
/// is so fast that even in pathological cases (see pathological vector
/// benchmarks), this is still just as fast or faster in practice.
///
/// This specific number was chosen by tweaking a bit and running benchmarks.
/// The rare-medium-needle, for example, gets about 5% faster by using this
/// algorithm instead of a prefilter-accelerated Two-Way. There's also a
/// theoretical desire to keep this number reasonably low, to mitigate the
/// impact of pathological cases. I did try 64, and some benchmarks got a
/// little better, and others (particularly the pathological ones), got a lot
/// worse. So... 32 it is?
pub(crate) const MAX_NEEDLE_LEN: usize = 32;

/// The implementation of the forward vector accelerated substring search.
///
/// This is extremely similar to the prefilter vector module by the same name.
/// The key difference is that this is not a prefilter. Instead, it handles
/// confirming its own matches. The trade off is that this only works with
/// smaller needles. The speed up here is that an inlined memcmp on a tiny
/// needle is very quick, even on pathological inputs. This is much better than
/// combining a prefilter with Two-Way, where using Two-Way to confirm the
/// match has higher latency.
///
/// So why not use this for all needles? We could, and it would probably work
/// really well on most inputs. But its worst case is multiplicative and we
/// want to guarantee worst case additive time. Some of the benchmarks try to
/// justify this (see the pathological ones).
///
/// The prefilter variant of this has more comments. Also note that we only
/// implement this for forward searches for now. If you have a compelling use
/// case for accelerated reverse search, please file an issue.
#[derive(Clone, Copy, Debug)]
pub(crate) struct Forward {
    rare1i: u8,
    rare2i: u8,
}

impl Forward {
    /// Create a new "generic simd" forward searcher. If one could not be
    /// created from the given inputs, then None is returned.
    pub(crate) fn new(ninfo: &NeedleInfo, needle: &[u8]) -> Option<Forward> {
        let (rare1i, rare2i) = ninfo.rarebytes.as_rare_ordered_u8();
        // If the needle is too short or too long, give up. Also, give up
        // if the rare bytes detected are at the same position. (It likely
        // suggests a degenerate case, although it should technically not be
        // possible.)
        if needle.len() < MIN_NEEDLE_LEN
            || needle.len() > MAX_NEEDLE_LEN
            || rare1i == rare2i
        {
            return None;
        }
        Some(Forward { rare1i, rare2i })
    }

    /// Returns the minimum length of haystack that is needed for this searcher
    /// to work for a particular vector. Passing a haystack with a length
    /// smaller than this will cause `fwd_find` to panic.
    #[inline(always)]
    pub(crate) fn min_haystack_len<V: Vector>(&self) -> usize {
        self.rare2i as usize + size_of::<V>()
    }
}

/// Searches the given haystack for the given needle. The needle given should
/// be the same as the needle that this searcher was initialized with.
///
/// # Panics
///
/// When the given haystack has a length smaller than `min_haystack_len`.
///
/// # Safety
///
/// Since this is meant to be used with vector functions, callers need to
/// specialize this inside of a function with a `target_feature` attribute.
/// Therefore, callers must ensure that whatever target feature is being used
/// supports the vector functions that this function is specialized for. (For
/// the specific vector functions used, see the Vector trait implementations.)
#[inline(always)]
pub(crate) unsafe fn fwd_find<V: Vector>(
    fwd: &Forward,
    haystack: &[u8],
    needle: &[u8],
) -> Option<usize> {
    // It would be nice if we didn't have this check here, since the meta
    // searcher should handle it for us. But without this, I don't think we
    // guarantee that end_ptr.sub(needle.len()) won't result in UB. We could
    // put it as part of the safety contract, but it makes it more complicated
    // than necessary.
    if haystack.len() < needle.len() {
        return None;
    }
    let min_haystack_len = fwd.min_haystack_len::<V>();
    assert!(haystack.len() >= min_haystack_len, "haystack too small");
    debug_assert!(needle.len() <= haystack.len());
    debug_assert!(
        needle.len() >= MIN_NEEDLE_LEN,
        "needle must be at least {} bytes",
        MIN_NEEDLE_LEN,
    );
    debug_assert!(
        needle.len() <= MAX_NEEDLE_LEN,
        "needle must be at most {} bytes",
        MAX_NEEDLE_LEN,
    );

    let (rare1i, rare2i) = (fwd.rare1i as usize, fwd.rare2i as usize);
    let rare1chunk = V::splat(needle[rare1i]);
    let rare2chunk = V::splat(needle[rare2i]);

    let start_ptr = haystack.as_ptr();
    let end_ptr = start_ptr.add(haystack.len());
    let max_ptr = end_ptr.sub(min_haystack_len);
    let mut ptr = start_ptr;

    // N.B. I did experiment with unrolling the loop to deal with size(V)
    // bytes at a time and 2*size(V) bytes at a time. The double unroll was
    // marginally faster while the quadruple unroll was unambiguously slower.
    // In the end, I decided the complexity from unrolling wasn't worth it. I
    // used the memmem/krate/prebuilt/huge-en/ benchmarks to compare.
    while ptr <= max_ptr {
        let m = fwd_find_in_chunk(
            fwd, needle, ptr, end_ptr, rare1chunk, rare2chunk, !0,
        );
        if let Some(chunki) = m {
            return Some(matched(start_ptr, ptr, chunki));
        }
        ptr = ptr.add(size_of::<V>());
    }
    if ptr < end_ptr {
        let remaining = diff(end_ptr, ptr);
        debug_assert!(
            remaining < min_haystack_len,
            "remaining bytes should be smaller than the minimum haystack \
             length of {}, but there are {} bytes remaining",
            min_haystack_len,
            remaining,
        );
        if remaining < needle.len() {
            return None;
        }
        debug_assert!(
            max_ptr < ptr,
            "after main loop, ptr should have exceeded max_ptr",
        );
        let overlap = diff(ptr, max_ptr);
        debug_assert!(
            overlap > 0,
            "overlap ({}) must always be non-zero",
            overlap,
        );
        debug_assert!(
            overlap < size_of::<V>(),
            "overlap ({}) cannot possibly be >= than a vector ({})",
            overlap,
            size_of::<V>(),
        );
        // The mask has all of its bits set except for the first N least
        // significant bits, where N=overlap. This way, any matches that
        // occur in find_in_chunk within the overlap are automatically
        // ignored.
        let mask = !((1 << overlap) - 1);
        ptr = max_ptr;
        let m = fwd_find_in_chunk(
            fwd, needle, ptr, end_ptr, rare1chunk, rare2chunk, mask,
        );
        if let Some(chunki) = m {
            return Some(matched(start_ptr, ptr, chunki));
        }
    }
    None
}

/// Search for an occurrence of two rare bytes from the needle in the chunk
/// pointed to by ptr, with the end of the haystack pointed to by end_ptr. When
/// an occurrence is found, memcmp is run to check if a match occurs at the
/// corresponding position.
///
/// rare1chunk and rare2chunk correspond to vectors with the rare1 and rare2
/// bytes repeated in each 8-bit lane, respectively.
///
/// mask should have bits set corresponding the positions in the chunk in which
/// matches are considered. This is only used for the last vector load where
/// the beginning of the vector might have overlapped with the last load in
/// the main loop. The mask lets us avoid visiting positions that have already
/// been discarded as matches.
///
/// # Safety
///
/// It must be safe to do an unaligned read of size(V) bytes starting at both
/// (ptr + rare1i) and (ptr + rare2i). It must also be safe to do unaligned
/// loads on ptr up to (end_ptr - needle.len()).
#[inline(always)]
unsafe fn fwd_find_in_chunk<V: Vector>(
    fwd: &Forward,
    needle: &[u8],
    ptr: *const u8,
    end_ptr: *const u8,
    rare1chunk: V,
    rare2chunk: V,
    mask: u32,
) -> Option<usize> {
    let chunk0 = V::load_unaligned(ptr.add(fwd.rare1i as usize));
    let chunk1 = V::load_unaligned(ptr.add(fwd.rare2i as usize));

    let eq0 = chunk0.cmpeq(rare1chunk);
    let eq1 = chunk1.cmpeq(rare2chunk);

    let mut match_offsets = eq0.and(eq1).movemask() & mask;
    while match_offsets != 0 {
        let offset = match_offsets.trailing_zeros() as usize;
        let ptr = ptr.add(offset);
        if end_ptr.sub(needle.len()) < ptr {
            return None;
        }
        let chunk = core::slice::from_raw_parts(ptr, needle.len());
        if memcmp(needle, chunk) {
            return Some(offset);
        }
        match_offsets &= match_offsets - 1;
    }
    None
}

/// Accepts a chunk-relative offset and returns a haystack relative offset
/// after updating the prefilter state.
///
/// See the same function with the same name in the prefilter variant of this
/// algorithm to learned why it's tagged with inline(never). Even here, where
/// the function is simpler, inlining it leads to poorer codegen. (Although
/// it does improve some benchmarks, like prebuiltiter/huge-en/common-you.)
#[cold]
#[inline(never)]
fn matched(start_ptr: *const u8, ptr: *const u8, chunki: usize) -> usize {
    diff(ptr, start_ptr) + chunki
}

/// Subtract `b` from `a` and return the difference. `a` must be greater than
/// or equal to `b`.
fn diff(a: *const u8, b: *const u8) -> usize {
    debug_assert!(a >= b);
    (a as usize) - (b as usize)
}